use crate::system::System;
use anyhow::{Context, Result, bail};
use crossbeam_channel::{Receiver, Sender, bounded};
use log::{debug, error, info};
use nix::{
sys::signal::{Signal, kill},
unistd::Pid,
};
use serde::{Deserialize, Serialize};
use std::{
fs::{self, File, create_dir_all},
io::{BufRead, BufReader},
path::{Path, PathBuf},
process::{Command, Stdio},
thread::{JoinHandle, sleep, spawn},
time::{Duration, Instant},
};
pub struct Process {
command: String,
died: Receiver<()>,
kill: Sender<()>,
log_file: PathBuf,
name: String,
pid: u32,
readiness_timeout: u64,
watch: Option<JoinHandle<Result<()>>>,
}
pub trait Stoppable {
fn stop(&mut self) -> Result<()>;
}
pub type Started = Box<dyn Stoppable + Send + Sync>;
pub type Stoppables = Vec<Started>;
pub type ProcessState = Result<Started>;
#[derive(Deserialize, Serialize)]
struct Run {
command: PathBuf,
args: Vec<String>,
}
impl Process {
pub fn start(dir: &Path, identifier: &str, command: &str, args: &[&str]) -> Result<Process> {
if command.is_empty() {
bail!("No valid command provided")
}
info!("Starting {}", identifier);
create_dir_all(dir)?;
let run_file = dir.join("run.json");
let run = if !run_file.exists() {
debug!(
"No previous run file '{}' found, writing new one",
run_file.display()
);
let f = Run {
command: System::find_executable(command)?,
args: args.iter().map(|x| (*x).to_string()).collect(),
};
fs::write(&run_file, serde_json::to_string_pretty(&f)?)?;
f
} else {
debug!("Re using run file '{}'", run_file.display());
let contents = fs::read_to_string(&run_file)?;
serde_json::from_str(&contents)?
};
let mut log_file = dir.join(command);
log_file.set_extension("log");
let out_file = File::create(&log_file)?;
let err_file = out_file.try_clone()?;
let mut child = Command::new(run.command)
.args(run.args)
.stderr(Stdio::from(err_file))
.stdout(Stdio::from(out_file))
.spawn()
.with_context(|| format!("Unable to start process '{}' ({})", identifier, command,))?;
let (kill, killed) = bounded(1);
let (dead, died) = bounded(1);
let c = command.to_owned();
let n = identifier.to_owned();
let pid = child.id();
let lf = log_file.clone();
let watch = spawn(move || {
let status = child.wait()?;
if killed.try_recv().is_err() {
error!("{} ({}) died unexpectedly", n, c);
Self::dump_log_tail(&lf, &n);
dead.send(())?;
} else {
info!("{} stopped", n);
}
debug!("{} ({}) {}", n, c, status);
Ok(())
});
Ok(Process {
command: command.into(),
died,
kill,
log_file,
name: identifier.into(),
pid,
readiness_timeout: 120,
watch: Some(watch),
})
}
pub fn wait_ready(&mut self, pattern: &str) -> Result<()> {
debug!(
"Waiting for process '{}' ({}) to become ready with pattern: '{}'",
self.name, self.command, pattern
);
let now = Instant::now();
let file = File::open(&self.log_file)?;
let mut reader = BufReader::new(file);
let mut line = String::new();
while now.elapsed().as_secs() < self.readiness_timeout {
line.clear();
reader.read_line(&mut line)?;
if line.is_empty() {
if self.died.try_recv().is_ok() {
bail!(
"{} ({}) died before becoming ready",
self.name,
self.command
)
}
sleep(Duration::from_millis(10));
continue;
}
if line.contains(pattern) {
info!("{} is ready", self.name);
debug!("Found pattern '{}' in line '{}'", pattern, line.trim());
return Ok(());
}
}
Self::dump_log_tail(&self.log_file, &self.name);
self.stop()?;
bail!(
"Timed out after {}s waiting for process '{}' ({}) to become ready \
with pattern '{}' (log: {})",
self.readiness_timeout,
self.name,
self.command,
pattern,
self.log_file.display(),
)
}
fn dump_log_tail(log_file: &Path, name: &str) {
if let Ok(content) = fs::read_to_string(log_file) {
let lines: Vec<&str> = content.lines().collect();
let start = lines.len().saturating_sub(20);
error!("Last log lines of {}:", name);
for line in &lines[start..] {
error!(" {}", line);
}
}
}
}
impl Stoppable for Process {
fn stop(&mut self) -> Result<()> {
debug!("Stopping process {} (via {})", self.name, self.command);
self.kill.send(()).with_context(|| {
format!(
"Unable to send kill signal to process {} (via {})",
self.name, self.command,
)
})?;
let pid = i32::try_from(self.pid)
.with_context(|| format!("PID {} exceeds i32 range", self.pid))?;
kill(Pid::from_raw(pid), Signal::SIGTERM)?;
if let Some(handle) = self.watch.take()
&& handle.join().is_err()
{
bail!(
"Unable to stop process {} (via {})",
self.name,
self.command
);
}
debug!("Process {} (via {}) stopped", self.name, self.command);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn start_success() -> Result<()> {
let d = tempdir()?;
Process::start(d.path(), "", "echo", &[])?;
Ok(())
}
#[test]
fn start_failure_no_command() -> Result<()> {
let d = tempdir()?;
assert!(Process::start(d.path(), "", "", &[]).is_err());
Ok(())
}
#[test]
fn start_failure_invalid_command() -> Result<()> {
let d = tempdir()?;
assert!(Process::start(d.path(), "", "invalid_command", &[]).is_err());
Ok(())
}
#[test]
fn wait_ready_success() -> Result<()> {
let d = tempdir()?;
let mut p = Process::start(d.path(), "", "echo", &["test"])?;
p.wait_ready("test")?;
Ok(())
}
#[test]
fn wait_ready_failure() -> Result<()> {
let d = tempdir()?;
let mut p = Process::start(d.path(), "", "echo", &["test"])?;
p.readiness_timeout = 1;
assert!(p.wait_ready("invalid").is_err());
Ok(())
}
#[test]
fn stop_success() -> Result<()> {
let d = tempdir()?;
let mut p = Process::start(d.path(), "", "sleep", &["500"])?;
p.stop()?;
Ok(())
}
}