mothership 0.0.100

Process supervisor with HTTP exposure - wrap, monitor, and expose your fleet
Documentation
//! Pre-launch job execution
//!
//! Runs pre-launch jobs in dependency order before any ship/bay starts.
//! If any job fails, the entire launch is aborted.

use crate::charter::Prelaunch;
use std::collections::{HashMap, HashSet};
use std::process::Stdio;
use tokio::process::Command;
use tracing::{error, info};

/// Error during prelaunch execution
#[derive(Debug)]
pub struct PrelaunchError {
    pub job_name: String,
    pub message: String,
    pub exit_code: Option<i32>,
}

impl std::fmt::Display for PrelaunchError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Prelaunch job '{}' failed: {}",
            self.job_name, self.message
        )?;
        if let Some(code) = self.exit_code {
            write!(f, " (exit code: {})", code)?;
        }
        Ok(())
    }
}

impl std::error::Error for PrelaunchError {}

/// Execute all prelaunch jobs in dependency order
///
/// Jobs are executed sequentially, respecting depends_on ordering.
/// If any job fails (non-zero exit), returns an error immediately.
pub async fn run_prelaunch(
    jobs: &[Prelaunch],
    global_env: &HashMap<String, String>,
) -> Result<(), PrelaunchError> {
    if jobs.is_empty() {
        return Ok(());
    }

    info!("[PRELAUNCH] Running {} job(s)", jobs.len());

    // Topological sort by depends_on
    let ordered = topological_sort(jobs)?;

    for job in ordered {
        run_job(job, global_env).await?;
    }

    info!("[PRELAUNCH] All jobs completed successfully");
    Ok(())
}

/// Run a single prelaunch job
async fn run_job(
    job: &Prelaunch,
    global_env: &HashMap<String, String>,
) -> Result<(), PrelaunchError> {
    info!("[PRELAUNCH] Running '{}'", job.name);

    let mut cmd = Command::new(&job.command);
    cmd.args(&job.args);

    // Inherit environment, then apply global env, then job-specific env
    cmd.envs(std::env::vars());
    cmd.envs(global_env.iter());
    cmd.envs(job.env.iter());

    // Capture output for logging
    cmd.stdout(Stdio::piped());
    cmd.stderr(Stdio::piped());

    let output = cmd.output().await.map_err(|e| PrelaunchError {
        job_name: job.name.clone(),
        message: format!("Failed to spawn: {}", e),
        exit_code: None,
    })?;

    // Log stdout/stderr if present
    if !output.stdout.is_empty() {
        let stdout = String::from_utf8_lossy(&output.stdout);
        for line in stdout.lines() {
            info!("[PRELAUNCH:{}] {}", job.name, line);
        }
    }
    if !output.stderr.is_empty() {
        let stderr = String::from_utf8_lossy(&output.stderr);
        for line in stderr.lines() {
            error!("[PRELAUNCH:{}] {}", job.name, line);
        }
    }

    if !output.status.success() {
        return Err(PrelaunchError {
            job_name: job.name.clone(),
            message: "Command failed".to_string(),
            exit_code: output.status.code(),
        });
    }

    info!("[PRELAUNCH] '{}' completed ✓", job.name);
    Ok(())
}

/// Topological sort of jobs by depends_on
fn topological_sort(jobs: &[Prelaunch]) -> Result<Vec<&Prelaunch>, PrelaunchError> {
    let job_map: HashMap<&str, &Prelaunch> = jobs.iter().map(|j| (j.name.as_str(), j)).collect();

    let mut result = Vec::with_capacity(jobs.len());
    let mut visited: HashSet<&str> = HashSet::new();
    let mut in_stack: HashSet<&str> = HashSet::new();

    fn visit<'a>(
        name: &'a str,
        job_map: &HashMap<&str, &'a Prelaunch>,
        visited: &mut HashSet<&'a str>,
        in_stack: &mut HashSet<&'a str>,
        result: &mut Vec<&'a Prelaunch>,
    ) -> Result<(), PrelaunchError> {
        if in_stack.contains(name) {
            return Err(PrelaunchError {
                job_name: name.to_string(),
                message: "Circular dependency detected".to_string(),
                exit_code: None,
            });
        }
        if visited.contains(name) {
            return Ok(());
        }

        in_stack.insert(name);

        if let Some(job) = job_map.get(name) {
            for dep in &job.depends_on {
                visit(dep, job_map, visited, in_stack, result)?;
            }
            visited.insert(name);
            in_stack.remove(name);
            result.push(job);
        }

        Ok(())
    }

    for job in jobs {
        visit(
            &job.name,
            &job_map,
            &mut visited,
            &mut in_stack,
            &mut result,
        )?;
    }

    Ok(result)
}

#[cfg(test)]
mod tests {
    use super::*;

    fn make_job(name: &str, depends_on: Vec<&str>) -> Prelaunch {
        Prelaunch {
            name: name.to_string(),
            command: "echo".to_string(),
            args: vec![name.to_string()],
            env: HashMap::new(),
            depends_on: depends_on.into_iter().map(String::from).collect(),
            comment: None,
        }
    }

    #[test]
    fn test_topological_sort_no_deps() {
        let jobs = vec![
            make_job("a", vec![]),
            make_job("b", vec![]),
            make_job("c", vec![]),
        ];
        let sorted = topological_sort(&jobs).unwrap();
        assert_eq!(sorted.len(), 3);
    }

    #[test]
    fn test_topological_sort_with_deps() {
        let jobs = vec![
            make_job("memgraph", vec!["postgres"]),
            make_job("postgres", vec![]),
        ];
        let sorted = topological_sort(&jobs).unwrap();
        assert_eq!(sorted[0].name, "postgres");
        assert_eq!(sorted[1].name, "memgraph");
    }

    #[test]
    fn test_topological_sort_circular() {
        let jobs = vec![make_job("a", vec!["b"]), make_job("b", vec!["a"])];
        let result = topological_sort(&jobs);
        assert!(result.is_err());
        assert!(result.unwrap_err().message.contains("Circular"));
    }
}