kubernix 0.3.1

Kubernetes development cluster bootstrapping with Nix packages
Documentation
//! Generic process lifecycle management.
//!
//! Provides the [`Process`] abstraction that spawns a child process,
//! watches it for unexpected exits, waits for readiness based on log
//! output patterns, and handles graceful shutdown via SIGTERM.

use crate::system::System;
use anyhow::{Context, Result, bail};
use crossbeam_channel::{Receiver, Sender, bounded};
use log::{debug, error, info};
use nix::{
    sys::signal::{Signal, kill},
    unistd::Pid,
};
use serde::{Deserialize, Serialize};
use std::{
    fs::{self, File, create_dir_all},
    io::{BufRead, BufReader},
    path::{Path, PathBuf},
    process::{Command, Stdio},
    thread::{JoinHandle, sleep, spawn},
    time::{Duration, Instant},
};

/// A general process abstraction
pub struct Process {
    command: String,
    died: Receiver<()>,
    kill: Sender<()>,
    log_file: PathBuf,
    name: String,
    pid: u32,
    readiness_timeout: u64,
    watch: Option<JoinHandle<Result<()>>>,
}

/// The trait to stop something
pub trait Stoppable {
    /// Stop the process
    fn stop(&mut self) -> Result<()>;
}

/// A running process handle that can be stopped during cluster shutdown.
pub type Started = Box<dyn Stoppable + Send + Sync>;

/// Ordered collection of running processes, stopped in reverse order during cleanup.
pub type Stoppables = Vec<Started>;

/// The result of starting a component process, either a stoppable handle or an error.
pub type ProcessState = Result<Started>;

#[derive(Deserialize, Serialize)]
struct Run {
    command: PathBuf,
    args: Vec<String>,
}

impl Process {
    /// Creates a new `Process` instance by spawning the provided `command` and `args`.
    /// If the process creation fails, an `Error` will be returned.
    pub fn start(dir: &Path, identifier: &str, command: &str, args: &[&str]) -> Result<Process> {
        // Prepare the commands
        if command.is_empty() {
            bail!("No valid command provided")
        }
        info!("Starting {}", identifier);

        // Write the executed command into the dir
        create_dir_all(dir)?;

        // If the run file exists, execute only that one
        let run_file = dir.join("run.json");
        let run = if !run_file.exists() {
            debug!(
                "No previous run file '{}' found, writing new one",
                run_file.display()
            );
            // Write the run file
            let f = Run {
                command: System::find_executable(command)?,
                args: args.iter().map(|x| (*x).to_string()).collect(),
            };
            fs::write(&run_file, serde_json::to_string_pretty(&f)?)?;
            f
        } else {
            debug!("Re using run file '{}'", run_file.display());
            let contents = fs::read_to_string(&run_file)?;
            serde_json::from_str(&contents)?
        };

        // Prepare the log dir and file
        let mut log_file = dir.join(command);
        log_file.set_extension("log");
        let out_file = File::create(&log_file)?;
        let err_file = out_file.try_clone()?;

        // Spawn the process child
        let mut child = Command::new(run.command)
            .args(run.args)
            .stderr(Stdio::from(err_file))
            .stdout(Stdio::from(out_file))
            .spawn()
            .with_context(|| format!("Unable to start process '{}' ({})", identifier, command,))?;

        // Start the watcher thread
        let (kill, killed) = bounded(1);
        let (dead, died) = bounded(1);
        let c = command.to_owned();
        let n = identifier.to_owned();
        let pid = child.id();
        let lf = log_file.clone();
        let watch = spawn(move || {
            // Wait for the process to exit
            let status = child.wait()?;

            // No kill send, we assume that the process died
            if killed.try_recv().is_err() {
                error!("{} ({}) died unexpectedly", n, c);
                Self::dump_log_tail(&lf, &n);
                dead.send(())?;
            } else {
                info!("{} stopped", n);
            }
            debug!("{} ({}) {}", n, c, status);
            Ok(())
        });

        Ok(Process {
            command: command.into(),
            died,
            kill,
            log_file,
            name: identifier.into(),
            pid,
            readiness_timeout: 120,
            watch: Some(watch),
        })
    }

    /// Wait for the process to become ready, by searching for the pattern in
    /// every line of its output.
    pub fn wait_ready(&mut self, pattern: &str) -> Result<()> {
        debug!(
            "Waiting for process '{}' ({}) to become ready with pattern: '{}'",
            self.name, self.command, pattern
        );
        let now = Instant::now();
        let file = File::open(&self.log_file)?;
        let mut reader = BufReader::new(file);
        let mut line = String::new();

        while now.elapsed().as_secs() < self.readiness_timeout {
            line.clear();
            reader.read_line(&mut line)?;

            if line.is_empty() {
                if self.died.try_recv().is_ok() {
                    bail!(
                        "{} ({}) died before becoming ready",
                        self.name,
                        self.command
                    )
                }
                sleep(Duration::from_millis(10));
                continue;
            }

            if line.contains(pattern) {
                info!("{} is ready", self.name);
                debug!("Found pattern '{}' in line '{}'", pattern, line.trim());
                return Ok(());
            }
        }

        // Dump log before stopping so the file is still intact
        Self::dump_log_tail(&self.log_file, &self.name);
        self.stop()?;
        bail!(
            "Timed out after {}s waiting for process '{}' ({}) to become ready \
             with pattern '{}' (log: {})",
            self.readiness_timeout,
            self.name,
            self.command,
            pattern,
            self.log_file.display(),
        )
    }

    /// Dump the last lines of a process log file for debugging
    fn dump_log_tail(log_file: &Path, name: &str) {
        if let Ok(content) = fs::read_to_string(log_file) {
            let lines: Vec<&str> = content.lines().collect();
            let start = lines.len().saturating_sub(20);
            error!("Last log lines of {}:", name);
            for line in &lines[start..] {
                error!("  {}", line);
            }
        }
    }
}

impl Stoppable for Process {
    /// Stopping the process by killing it
    fn stop(&mut self) -> Result<()> {
        debug!("Stopping process {} (via {})", self.name, self.command);

        // Indicate that this shutdown is intended
        self.kill.send(()).with_context(|| {
            format!(
                "Unable to send kill signal to process {} (via {})",
                self.name, self.command,
            )
        })?;

        // Send SIGTERM to the process
        let pid = i32::try_from(self.pid)
            .with_context(|| format!("PID {} exceeds i32 range", self.pid))?;
        kill(Pid::from_raw(pid), Signal::SIGTERM)?;

        // Join the waiting thread
        if let Some(handle) = self.watch.take()
            && handle.join().is_err()
        {
            bail!(
                "Unable to stop process {} (via {})",
                self.name,
                self.command
            );
        }
        debug!("Process {} (via {}) stopped", self.name, self.command);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test]
    fn start_success() -> Result<()> {
        let d = tempdir()?;
        Process::start(d.path(), "", "echo", &[])?;
        Ok(())
    }

    #[test]
    fn start_failure_no_command() -> Result<()> {
        let d = tempdir()?;
        assert!(Process::start(d.path(), "", "", &[]).is_err());
        Ok(())
    }

    #[test]
    fn start_failure_invalid_command() -> Result<()> {
        let d = tempdir()?;
        assert!(Process::start(d.path(), "", "invalid_command", &[]).is_err());
        Ok(())
    }

    #[test]
    fn wait_ready_success() -> Result<()> {
        let d = tempdir()?;
        let mut p = Process::start(d.path(), "", "echo", &["test"])?;
        p.wait_ready("test")?;
        Ok(())
    }

    #[test]
    fn wait_ready_failure() -> Result<()> {
        let d = tempdir()?;
        let mut p = Process::start(d.path(), "", "echo", &["test"])?;
        p.readiness_timeout = 1;
        assert!(p.wait_ready("invalid").is_err());
        Ok(())
    }

    #[test]
    fn stop_success() -> Result<()> {
        let d = tempdir()?;
        let mut p = Process::start(d.path(), "", "sleep", &["500"])?;
        p.stop()?;
        Ok(())
    }
}