contrail-cli 0.1.4

Contrail CLI: service lifecycle plus history import and cross-machine export/merge
Documentation
use anyhow::{Context, Result, bail};
use std::env;
use std::ffi::OsString;
use std::fs::{self, OpenOptions};
use std::io;
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::thread;
use std::time::{Duration, Instant};

const PROC_CORE_DAEMON: ManagedProcess = ManagedProcess {
    name: "core_daemon",
    binary: "core_daemon",
    binary_env: "CONTRAIL_CORE_DAEMON_BIN",
    pid_file: "core_daemon.pid",
    log_file: "core_daemon.log",
    health_addr: None,
};

const PROC_DASHBOARD: ManagedProcess = ManagedProcess {
    name: "dashboard",
    binary: "dashboard",
    binary_env: "CONTRAIL_DASHBOARD_BIN",
    pid_file: "dashboard.pid",
    log_file: "dashboard.log",
    health_addr: Some("127.0.0.1:3000"),
};

const PROC_ANALYSIS: ManagedProcess = ManagedProcess {
    name: "analysis",
    binary: "analysis",
    binary_env: "CONTRAIL_ANALYSIS_BIN",
    pid_file: "analysis.pid",
    log_file: "analysis.log",
    health_addr: Some("127.0.0.1:3210"),
};

const PROCS_START_ORDER: [ManagedProcess; 3] = [PROC_CORE_DAEMON, PROC_DASHBOARD, PROC_ANALYSIS];
const PROCS_STOP_ORDER: [ManagedProcess; 3] = [PROC_ANALYSIS, PROC_DASHBOARD, PROC_CORE_DAEMON];

pub fn run() -> Result<()> {
    let args: Vec<OsString> = env::args_os().collect();
    if let Some(cmd) = parse_lifecycle_command(&args) {
        return run_lifecycle_command(cmd);
    }

    importer::run()
}

#[derive(Clone, Copy)]
enum LifecycleCommand {
    Up,
    Down,
    Status,
}

#[derive(Clone, Copy)]
struct ManagedProcess {
    name: &'static str,
    binary: &'static str,
    binary_env: &'static str,
    pid_file: &'static str,
    log_file: &'static str,
    health_addr: Option<&'static str>,
}

fn parse_lifecycle_command(args: &[OsString]) -> Option<LifecycleCommand> {
    let command = args.get(1)?.to_str()?;
    match command {
        "up" => Some(LifecycleCommand::Up),
        "down" => Some(LifecycleCommand::Down),
        "status" => Some(LifecycleCommand::Status),
        _ => None,
    }
}

fn run_lifecycle_command(command: LifecycleCommand) -> Result<()> {
    let run_dir = contrail_root_dir()?.join("run");
    fs::create_dir_all(&run_dir)
        .with_context(|| format!("failed to create run directory at {}", run_dir.display()))?;

    match command {
        LifecycleCommand::Up => {
            let mut started: Vec<ManagedProcess> = Vec::new();
            for process in PROCS_START_ORDER {
                if let Err(err) = start_process(&run_dir, process) {
                    for started_process in started.iter().rev() {
                        let _ = stop_process(&run_dir, *started_process);
                    }
                    return Err(err);
                }
                started.push(process);
            }
        }
        LifecycleCommand::Down => {
            for process in PROCS_STOP_ORDER {
                stop_process(&run_dir, process)?;
            }
        }
        LifecycleCommand::Status => {
            for process in PROCS_START_ORDER {
                print_process_status(&run_dir, process);
            }
        }
    }

    Ok(())
}

fn contrail_root_dir() -> Result<PathBuf> {
    if let Some(root) = env::var_os("CONTRAIL_HOME") {
        return Ok(PathBuf::from(root));
    }

    let home = env::var_os("HOME")
        .map(PathBuf::from)
        .context("HOME is not set and CONTRAIL_HOME was not provided")?;
    Ok(home.join(".contrail"))
}

fn start_process(run_dir: &Path, process: ManagedProcess) -> Result<()> {
    let pid_path = run_dir.join(process.pid_file);
    let log_path = run_dir.join(process.log_file);

    if let Some(pid) = read_pid(&pid_path) {
        if is_pid_running(pid) {
            println!("{} already running (pid {})", process.name, pid);
            return Ok(());
        }
        fs::remove_file(&pid_path).ok();
    }

    let stdout_log = OpenOptions::new()
        .create(true)
        .append(true)
        .open(&log_path)
        .with_context(|| format!("failed to open log file {}", log_path.display()))?;
    let stderr_log = stdout_log
        .try_clone()
        .with_context(|| format!("failed to clone log file handle {}", log_path.display()))?;

    let binary = resolve_binary_path(process)?;
    let mut command = Command::new(&binary);
    command
        .stdin(Stdio::null())
        .stdout(Stdio::from(stdout_log))
        .stderr(Stdio::from(stderr_log));

    let child = match command.spawn() {
        Ok(child) => child,
        Err(err) if err.kind() == io::ErrorKind::NotFound => {
            bail!(
                "{} binary not found in PATH. Install it, then retry.",
                process.binary
            )
        }
        Err(err) => {
            return Err(err).with_context(|| format!("failed to start {}", process.binary));
        }
    };

    let pid = child.id();
    fs::write(&pid_path, format!("{pid}\n"))
        .with_context(|| format!("failed to write pid file {}", pid_path.display()))?;
    println!(
        "started {} (pid {}, binary {}, log {})",
        process.name,
        pid,
        binary.display(),
        log_path.display()
    );

    let became_healthy = if let Some(addr) = process.health_addr {
        wait_for_health(process.name, addr)
    } else {
        true
    };

    if !became_healthy {
        if !is_pid_running(pid) {
            bail!(
                "{} exited before becoming healthy. Check {}. If a different `{}` binary is installed, set {} to the intended binary path.",
                process.name,
                log_path.display(),
                process.binary,
                process.binary_env
            );
        }
        bail!(
            "{} did not become healthy within timeout. Check {}. If a different `{}` binary is installed, set {} to the intended binary path.",
            process.name,
            log_path.display(),
            process.binary,
            process.binary_env
        );
    } else if !is_pid_running(pid) {
        bail!(
            "{} exited shortly after start. Check {}",
            process.name,
            log_path.display()
        );
    }

    Ok(())
}

fn stop_process(run_dir: &Path, process: ManagedProcess) -> Result<()> {
    let pid_path = run_dir.join(process.pid_file);

    let Some(pid) = read_pid(&pid_path) else {
        println!("{} not running", process.name);
        return Ok(());
    };

    if !is_pid_running(pid) {
        fs::remove_file(&pid_path).ok();
        println!("{} not running", process.name);
        return Ok(());
    }

    let _ = send_signal(pid, None)?;
    let deadline = Instant::now() + Duration::from_secs(5);
    while Instant::now() < deadline {
        if !is_pid_running(pid) {
            break;
        }
        thread::sleep(Duration::from_millis(100));
    }

    if is_pid_running(pid) {
        let killed = send_signal(pid, Some("-9"))?;
        if !killed && is_pid_running(pid) {
            bail!("failed to stop {} (pid {})", process.name, pid);
        }
    }

    fs::remove_file(&pid_path).ok();
    println!("stopped {} (pid {})", process.name, pid);
    Ok(())
}

fn print_process_status(run_dir: &Path, process: ManagedProcess) {
    let pid_path = run_dir.join(process.pid_file);
    match read_pid(&pid_path) {
        Some(pid) if is_pid_running(pid) => {
            println!("{}: running (pid {})", process.name, pid);
        }
        Some(_) => {
            fs::remove_file(&pid_path).ok();
            println!("{}: stopped", process.name);
        }
        None => {
            println!("{}: stopped", process.name);
        }
    }
}

fn read_pid(pid_path: &Path) -> Option<u32> {
    let raw = fs::read_to_string(pid_path).ok()?;
    raw.trim().parse::<u32>().ok()
}

fn is_pid_running(pid: u32) -> bool {
    Command::new("kill")
        .arg("-0")
        .arg(pid.to_string())
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .status()
        .map(|status| status.success())
        .unwrap_or(false)
}

fn send_signal(pid: u32, signal: Option<&str>) -> Result<bool> {
    let mut command = Command::new("kill");
    if let Some(signal) = signal {
        command.arg(signal);
    }
    let status = command
        .arg(pid.to_string())
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .status()
        .with_context(|| format!("failed to send signal to pid {}", pid))?;
    Ok(status.success())
}

fn wait_for_health(name: &str, addr: &str) -> bool {
    let deadline = Instant::now() + Duration::from_secs(15);
    while Instant::now() < deadline {
        if TcpStream::connect(addr).is_ok() {
            println!("{} healthy at http://{}", name, addr);
            return true;
        }
        thread::sleep(Duration::from_millis(500));
    }
    eprintln!(
        "warning: {} did not become healthy at http://{}",
        name, addr
    );
    false
}

fn resolve_binary_path(process: ManagedProcess) -> Result<PathBuf> {
    if let Some(path) = env::var_os(process.binary_env)
        && !path.is_empty()
    {
        return Ok(PathBuf::from(path));
    }

    if let Ok(current_exe) = env::current_exe()
        && let Some(bin_dir) = current_exe.parent()
    {
        let sibling = bin_dir.join(process.binary);
        if sibling.is_file() {
            return Ok(sibling);
        }
    }

    Ok(PathBuf::from(process.binary))
}

#[cfg(test)]
mod tests {
    use super::{LifecycleCommand, parse_lifecycle_command};
    use std::ffi::OsString;

    #[test]
    fn parses_lifecycle_commands() {
        let args = vec![OsString::from("contrail"), OsString::from("up")];
        assert!(matches!(
            parse_lifecycle_command(&args),
            Some(LifecycleCommand::Up)
        ));

        let args = vec![OsString::from("contrail"), OsString::from("down")];
        assert!(matches!(
            parse_lifecycle_command(&args),
            Some(LifecycleCommand::Down)
        ));

        let args = vec![OsString::from("contrail"), OsString::from("status")];
        assert!(matches!(
            parse_lifecycle_command(&args),
            Some(LifecycleCommand::Status)
        ));
    }

    #[test]
    fn leaves_other_commands_for_importer_cli() {
        let args = vec![OsString::from("contrail"), OsString::from("import-history")];
        assert!(parse_lifecycle_command(&args).is_none());
    }
}