use anyhow::Result;
use clap::Parser;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{Notify, watch};
use tracing::{debug, info};
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
use mothership::Lifecycle;
use mothership::charter::{Manifest, Vessel};
#[cfg(feature = "tokio-postgres")]
use mothership::flagship::{Coordinator, FlagshipRole, PostgresElection};
use mothership::fleet::{Fleet, run_prelaunch, set_suppress_stdout, verify_uplinks};
use mothership::http::{HttpExposure, MetricsRegistry, MetricsServer, set_global_metrics_registry};
#[cfg(feature = "tui")]
use mothership::tui::TuiApp;
#[derive(Debug, Clone, Default)]
pub struct TagFilter {
pub only: Vec<String>,
pub except: Vec<String>,
}
impl TagFilter {
pub fn matches(&self, tags: &[String]) -> bool {
if !self.except.is_empty() {
for tag in tags {
if self.except.contains(tag) {
return false;
}
}
}
if !self.only.is_empty() {
for tag in tags {
if self.only.contains(tag) {
return true;
}
}
return false;
}
true
}
pub fn is_empty(&self) -> bool {
self.only.is_empty() && self.except.is_empty()
}
}
fn merge_vecs(a: Vec<String>, b: Vec<String>) -> Vec<String> {
let mut result = a;
result.extend(b);
result
}
#[derive(Parser)]
#[command(name = "mothership")]
#[command(about = "Wrap, monitor, and expose your fleet", long_about = None)]
#[command(disable_version_flag = true)]
struct Cli {
#[arg(short = 'v', long = "version")]
version: bool,
#[command(subcommand)]
command: Option<Commands>,
#[arg(short, long, value_name = "FILE", default_value = "ship-manifest.toml")]
config: PathBuf,
#[arg(long)]
tui: bool,
#[arg(long, value_delimiter = ',', action = clap::ArgAction::Append)]
only: Vec<String>,
#[arg(long, value_delimiter = ',', action = clap::ArgAction::Append)]
except: Vec<String>,
}
#[derive(clap::Subcommand)]
enum Commands {
Run {
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
#[arg(long)]
tui: bool,
#[arg(long, value_delimiter = ',', action = clap::ArgAction::Append)]
only: Vec<String>,
#[arg(long, value_delimiter = ',', action = clap::ArgAction::Append)]
except: Vec<String>,
},
#[clap(alias = "validate")]
Clearance {
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
#[arg(short, long)]
verbose: bool,
},
Preflight {
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
#[arg(short, long)]
verbose: bool,
},
Chart {
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
},
Init {
#[arg(short, long)]
force: bool,
},
}
fn runs_process_supervision(command: Option<&Commands>) -> bool {
matches!(command, None | Some(Commands::Run { .. }))
}
#[cfg(target_os = "linux")]
fn maybe_run_pid1_shim(should_supervise: bool) -> Result<()> {
use nix::errno::Errno;
use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid};
use nix::unistd::Pid;
use std::os::unix::process::CommandExt;
const SHIM_CHILD_ENV: &str = "MS_PID1_SHIM_CHILD";
if !should_supervise {
return Ok(());
}
if std::process::id() != 1 || std::env::var_os(SHIM_CHILD_ENV).is_some() {
return Ok(());
}
let exe = std::env::current_exe()?;
let mut cmd = std::process::Command::new(exe);
cmd.args(std::env::args_os().skip(1));
cmd.env(SHIM_CHILD_ENV, "1");
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
let child_pid = {
let child = cmd.spawn()?;
Pid::from_raw(child.id() as i32)
};
let mut sigset = SigSet::empty();
for sig in [
Signal::SIGCHLD,
Signal::SIGTERM,
Signal::SIGINT,
Signal::SIGHUP,
Signal::SIGQUIT,
] {
sigset.add(sig);
}
signal::pthread_sigmask(SigmaskHow::SIG_BLOCK, Some(&sigset), None)?;
loop {
loop {
match waitpid(Pid::from_raw(-1), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::StillAlive) => break,
Ok(WaitStatus::Exited(pid, code)) if pid == child_pid => std::process::exit(code),
Ok(WaitStatus::Signaled(pid, sig, _)) if pid == child_pid => {
std::process::exit(128 + sig as i32);
}
Ok(_) => {}
Err(Errno::ECHILD) => std::process::exit(0),
Err(e) => return Err(e.into()),
}
}
match sigset.wait()? {
Signal::SIGCHLD => {}
sig => {
let _ = signal::killpg(child_pid, sig);
let _ = signal::kill(child_pid, sig);
}
}
}
}
#[cfg(not(target_os = "linux"))]
fn maybe_run_pid1_shim(_should_supervise: bool) -> Result<()> {
Ok(())
}
fn main() -> Result<()> {
let cli = Cli::parse();
maybe_run_pid1_shim(runs_process_supervision(cli.command.as_ref()))?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
runtime.block_on(async_main(cli))
}
async fn async_main(cli: Cli) -> Result<()> {
if cli.version {
println!("mothership {}", env!("CARGO_PKG_VERSION"));
return Ok(());
}
#[cfg(feature = "tui")]
let tui_enabled = match &cli.command {
Some(Commands::Run { tui, .. }) => *tui || cli.tui,
None => cli.tui,
_ => false,
};
#[cfg(not(feature = "tui"))]
let tui_enabled = {
let requested = match &cli.command {
Some(Commands::Run { tui, .. }) => *tui || cli.tui,
None => cli.tui,
_ => false,
};
if requested {
eprintln!("Warning: TUI requested but not compiled in. Build with --features tui");
}
false
};
if !tui_enabled {
tracing_subscriber::registry()
.with(fmt::layer().json())
.with(EnvFilter::from_default_env().add_directive("mothership=info".parse()?))
.init();
}
match cli.command {
Some(Commands::Init { force }) => {
init_manifest(force)?;
}
Some(Commands::Clearance { config, verbose }) => {
let path = config.unwrap_or(cli.config);
validate_manifest(&path, verbose)?;
}
Some(Commands::Preflight { config, verbose }) => {
let path = config.unwrap_or(cli.config);
preflight_check(&path, verbose).await?;
}
Some(Commands::Chart { config }) => {
let path = config.unwrap_or(cli.config);
show_chart(&path)?;
}
Some(Commands::Run {
config,
tui,
only,
except,
}) => {
let path = config.unwrap_or(cli.config);
let filter = TagFilter {
only: merge_vecs(cli.only, only),
except: merge_vecs(cli.except, except),
};
run_fleet(&path, tui || cli.tui, filter).await?;
}
None => {
let filter = TagFilter {
only: cli.only,
except: cli.except,
};
run_fleet(&cli.config, cli.tui, filter).await?;
}
}
Ok(())
}
fn init_manifest(force: bool) -> Result<()> {
let path = PathBuf::from("ship-manifest.toml");
if path.exists() && !force {
anyhow::bail!("ship-manifest.toml already exists. Use --force to overwrite.");
}
let template = r#"# Ship Manifest - Fleet Configuration
# https://github.com/seuros/mothership
# =============================================================================
# BASE TEMPLATES (optional - reduce duplication)
# =============================================================================
# Define defaults inherited by all ships/modules
# [base.ship]
# env = { RAILS_ENV = "production" }
# critical = true
# tags = ["ruby"]
# comment = "Default ship settings"
# [base.module]
# phase = "request"
# tags = ["security"]
# =============================================================================
# MOTHERSHIP (global configuration)
# =============================================================================
[mothership]
metrics_port = 9090 # Prometheus metrics at http://127.0.0.1:9090/metrics
# Named binds (external listeners)
[mothership.bind]
http = "0.0.0.0:80"
# https = "0.0.0.0:443"
# ws = "0.0.0.0:8080"
# Environment variables inherited by all ships
# [mothership.env]
# RAILS_ENV = "production"
# =============================================================================
# FLEET (ship definitions)
# =============================================================================
# Ships can be filtered by tags at runtime:
# mothership --only web # Only ships tagged "web"
# mothership --only web,api # Ships tagged "web" OR "api"
# mothership --except workers # All except "workers"
# Web ships (HTTP-serving applications)
[[fleet.web]]
name = "app"
command = "ruby"
args = ["app.rb"]
bind = "tcp://127.0.0.1:3000"
healthcheck = "/health"
tags = ["web", "ruby"]
routes = [
{ bind = "http", pattern = "/.*" },
]
# Background workers
# [[fleet.workers]]
# name = "sidekiq"
# command = "bundle"
# args = ["exec", "sidekiq"]
# critical = false # Non-critical: crash won't kill fleet
# tags = ["workers", "ruby"]
# One-shot jobs (run once and exit)
# [[fleet.jobs]]
# name = "migrate"
# command = "bundle"
# args = ["exec", "rails", "db:migrate"]
# oneshot = true
# tags = ["jobs", "ruby"]
# WebSocket server
# [[fleet.web]]
# name = "cable"
# command = "anycable-go"
# bind = "tcp://127.0.0.1:8081"
# depends_on = ["app"]
# tags = ["web", "websocket"]
# routes = [
# { bind = "ws", pattern = "/cable" },
# ]
# =============================================================================
# WASM MODULES (optional plugins)
# =============================================================================
# [[modules]]
# name = "auth"
# wasm = "modules/auth.wasm"
# routes = ["/admin/.*"]
# phase = "request"
# tags = ["security"]
"#;
std::fs::write(&path, template)?;
println!("Created ship-manifest.toml");
println!();
println!("Next steps:");
println!(" 1. Edit ship-manifest.toml with your app configuration");
println!(" 2. Run: mothership clearance");
println!(" 3. Run: mothership --tui");
Ok(())
}
fn validate_manifest(path: &PathBuf, verbose: bool) -> Result<()> {
info!(path = %path.display(), "Validating manifest");
println!("[CLEARANCE] {}", path.display());
let manifest = match Manifest::load(path) {
Ok(m) => {
println!("manifest: ✓");
m
}
Err(e) => {
println!("manifest: ✗");
if verbose {
println!(" error: {}", e);
}
println!();
println!("[RESULT] fail");
return Err(e);
}
};
if verbose {
if !manifest.mothership.bind.is_empty() {
println!();
println!("[BINDS]");
for (name, bind) in &manifest.mothership.bind {
println!("- {}: {:?}", name, bind);
}
}
if !manifest.fleet.is_empty() {
println!();
println!("[FLEET] {} ships", manifest.ship_count());
for (group_name, ships) in &manifest.fleet {
for ship in ships {
let flags = format!(
"{}{}",
if ship.critical { "" } else { " [non-critical]" },
if ship.oneshot { " [oneshot]" } else { "" }
);
println!("- {}/{}: {}{}", group_name, ship.name, ship.command, flags);
}
}
}
if !manifest.bays.is_empty() {
println!();
println!("[BAYS] {} bays", manifest.bay_count());
for (bay_type, bays) in &manifest.bays {
for bay in bays {
println!("- {}/{}: {}", bay_type, bay.name, bay.command);
}
}
}
if !manifest.modules.is_empty() {
println!();
println!("[MODULES] {}", manifest.modules.len());
for m in &manifest.modules {
println!("- {}: {} [{:?}]", m.name, m.wasm, m.phase);
}
}
if !manifest.mothership.uplinks.is_empty() {
println!();
println!("[UPLINKS] {} configured", manifest.mothership.uplinks.len());
for uplink in &manifest.mothership.uplinks {
println!("- {}: {}", uplink.name, uplink.url);
}
}
if !manifest.mothership.prelaunch.is_empty() {
println!();
println!("[PRELAUNCH] {} jobs", manifest.mothership.prelaunch.len());
for job in &manifest.mothership.prelaunch {
let deps = if job.depends_on.is_empty() {
String::new()
} else {
format!(" (after: {})", job.depends_on.join(", "))
};
println!(
"- {}: {} {}{}",
job.name,
job.command,
job.args.join(" "),
deps
);
}
}
} else {
println!("ships: {}", manifest.ship_count());
println!("bays: {}", manifest.bay_count());
println!("uplinks: {}", manifest.mothership.uplinks.len());
println!("prelaunch: {}", manifest.mothership.prelaunch.len());
}
println!();
println!("[RESULT] pass");
Ok(())
}
async fn preflight_check(path: &PathBuf, verbose: bool) -> Result<()> {
println!("[PREFLIGHT] {}", path.display());
let manifest = match Manifest::load(path) {
Ok(m) => {
println!("manifest: ✓");
m
}
Err(e) => {
println!("manifest: ✗");
if verbose {
println!(" error: {}", e);
}
println!();
println!("[RESULT] fail");
return Err(e);
}
};
if manifest.mothership.uplinks.is_empty() {
println!("uplinks: ✓ (none configured)");
} else {
let total = manifest.mothership.uplinks.len();
let mut passed = 0;
let mut failed_uplinks: Vec<(&str, String)> = Vec::new();
for uplink in &manifest.mothership.uplinks {
match verify_uplinks(std::slice::from_ref(uplink)).await {
Ok(()) => {
passed += 1;
if verbose {
println!("- {}: ✓ ({})", uplink.name, uplink.url);
}
}
Err(e) => {
let error_msg = e.to_string();
failed_uplinks.push((&uplink.name, error_msg.clone()));
if verbose {
println!("- {}: ✗ ({})", uplink.name, uplink.url);
println!(" error: {}", error_msg);
}
}
}
}
if failed_uplinks.is_empty() {
println!("uplinks: ✓ ({}/{})", passed, total);
} else {
println!("uplinks: ✗ ({}/{})", passed, total);
if !verbose {
for (name, error) in &failed_uplinks {
println!(" {}: {}", name, error);
}
}
println!();
println!("[RESULT] fail");
anyhow::bail!("{} uplink(s) unreachable", failed_uplinks.len());
}
}
println!();
println!("[RESULT] pass");
Ok(())
}
fn show_chart(path: &PathBuf) -> Result<()> {
use mothership::charter::Bind;
let manifest = Manifest::load(path)?;
println!("Navigation Chart");
println!("================");
println!();
if manifest.mothership.bind.is_empty() {
println!("Docking Ports: (none configured)");
} else {
println!("Docking Ports:");
for (name, bind) in &manifest.mothership.bind {
let addr = match bind {
Bind::Tcp { host, port, .. } => format!("{}:{}", host, port),
Bind::Unix { path } => format!("unix://{}", path),
};
let proto_marker = if bind.has_proxy_protocol() {
" [PROXY]"
} else {
""
};
println!(" {} → {}{}", name, addr, proto_marker);
}
}
println!();
if !manifest.mothership.static_dirs.is_empty() {
println!("Cargo Bays (static files):");
for static_cfg in &manifest.mothership.static_dirs {
println!(" {} → {}", static_cfg.prefix, static_cfg.path);
if let Some(ref bind) = static_cfg.bind {
println!(" bind: {}", bind);
}
}
println!();
}
println!(
"Compression: {}",
if manifest.mothership.compression {
"enabled"
} else {
"disabled"
}
);
println!();
println!("Flight Vectors:");
#[allow(clippy::type_complexity)]
let mut vectors_by_port: indexmap::IndexMap<String, Vec<(&Vessel, &str, Option<&str>)>> =
indexmap::IndexMap::new();
for vessel in &manifest.vessels {
for route in vessel.routes() {
vectors_by_port
.entry(route.bind.clone())
.or_default()
.push((vessel, &route.pattern, route.strip_prefix.as_deref()));
}
}
if vectors_by_port.is_empty() {
println!(" (no vectors configured)");
} else {
for (port_name, vectors) in &vectors_by_port {
let port_addr = manifest.mothership.bind.get(port_name).map(|b| match b {
Bind::Tcp { host, port, .. } => format!("{}:{}", host, port),
Bind::Unix { path } => format!("unix://{}", path),
});
println!();
println!(
" [{}] ({})",
port_name,
port_addr.as_deref().unwrap_or("unknown")
);
for (vessel, pattern, strip_prefix) in vectors {
let (destination, suffix) = match vessel {
Vessel::Ship { config, .. } => {
if let Some(bind) = &config.bind {
let addr = match bind {
Bind::Tcp { host, port, .. } => format!("{}:{}", host, port),
Bind::Unix { path } => format!("unix://{}", path),
};
(addr, "".to_string())
} else {
("(no dock)".to_string(), "".to_string())
}
}
Vessel::Bay { bay_type, .. } => {
("(docked)".to_string(), format!(" âš“ {}", bay_type))
}
};
let strip = strip_prefix
.map(|p| format!(" (strip: {})", p))
.unwrap_or_default();
println!(
" {} → {} [{}]{}{}",
pattern,
destination,
vessel.name(),
suffix,
strip
);
}
}
}
println!();
if !manifest.bays.is_empty() {
println!("Docking Bays:");
for (bay_type, bays) in &manifest.bays {
for bay in bays {
let critical = if bay.critical { " âš " } else { "" };
println!(" {} [{}]{}", bay.name, bay_type, critical);
println!(" command: {} {}", bay.command, bay.args.join(" "));
}
}
println!();
}
if !manifest.modules.is_empty() {
println!("Onboard Modules:");
for m in &manifest.modules {
println!(" {} [{:?}]", m.name, m.phase);
for route in &m.routes {
println!(" {}", route);
}
}
}
Ok(())
}
async fn run_flagship_coordination(
config: &mothership::charter::FlagshipConfig,
#[allow(unused_variables)] manifest: &Manifest,
tui_enabled: bool,
) -> Result<bool> {
match config.election.as_str() {
"static" => {
let is_flagship = config
.static_flagship
.evaluate()
.map_err(|e| anyhow::anyhow!("failed to evaluate static_flagship: {}", e))?;
if !tui_enabled {
if is_flagship {
info!(
role = "flagship",
"Static election: this instance is the flagship"
);
} else {
info!(
role = "escort",
"Static election: this instance is an escort (skipping prelaunch)"
);
}
}
Ok(is_flagship)
}
#[cfg(feature = "tokio-postgres")]
"postgres" => {
use std::time::Duration;
let app_name = manifest
.fleet
.values()
.flatten()
.next()
.map(|v| v.name.clone())
.unwrap_or_else(|| "mothership".to_string());
let prelaunch_timeout = Duration::from_secs(config.prelaunch_timeout);
let url = config
.expanded_election_url()
.ok_or_else(|| anyhow::anyhow!("election_url required for postgres election"))?
.map_err(|e| anyhow::anyhow!("failed to expand election_url: {}", e))?;
let election = PostgresElection::new(url);
let mut coordinator = Coordinator::new(election, app_name.clone(), prelaunch_timeout);
let role = coordinator
.elect()
.await
.map_err(|e| anyhow::anyhow!("flagship election failed: {}", e))?;
match role {
FlagshipRole::Flagship => {
if !tui_enabled {
info!(role = "flagship", "Elected as flagship, will run prelaunch");
}
coordinator.signal_running().await.ok();
if !manifest.mothership.prelaunch.is_empty() {
if !tui_enabled {
info!(
count = manifest.mothership.prelaunch.len(),
"Running prelaunch jobs (flagship)"
);
}
match run_prelaunch(
&manifest.mothership.prelaunch,
&manifest.mothership.env,
)
.await
{
Ok(()) => {
coordinator.signal_ready().await.map_err(|e| {
anyhow::anyhow!("failed to signal ready: {}", e)
})?;
if !tui_enabled {
info!("Prelaunch complete, signaled ready to escorts");
}
}
Err(e) => {
coordinator.signal_abort().await.ok();
return Err(anyhow::anyhow!(
"prelaunch failed, signaled abort: {}",
e
));
}
}
} else {
coordinator
.signal_ready()
.await
.map_err(|e| anyhow::anyhow!("failed to signal ready: {}", e))?;
}
Ok(false)
}
FlagshipRole::Escort => {
if !tui_enabled {
info!(
role = "escort",
timeout_secs = prelaunch_timeout.as_secs(),
"Waiting for flagship signal"
);
}
coordinator
.wait_for_ready()
.await
.map_err(|e| anyhow::anyhow!("escort wait failed: {}", e))?;
if !tui_enabled {
info!("Received ready signal from flagship");
}
Ok(false)
}
FlagshipRole::Disabled => {
Ok(true)
}
}
}
#[cfg(not(feature = "tokio-postgres"))]
"postgres" => {
anyhow::bail!("postgres election requires the 'tokio-postgres' feature");
}
other => {
anyhow::bail!("unknown election backend: {}", other);
}
}
}
async fn run_fleet(path: &PathBuf, tui_enabled: bool, filter: TagFilter) -> Result<()> {
let mut lifecycle = Lifecycle::new();
if tui_enabled {
set_suppress_stdout(true);
} else {
info!(path = %path.display(), "Loading manifest");
}
let mut manifest = Manifest::load(path)?;
lifecycle.manifest_loaded();
if !filter.is_empty() {
let before_count = manifest.ship_count();
manifest.filter_ships(&|tags| filter.matches(tags));
manifest.validate_vessel_dependencies()?;
let after_count = manifest.ship_count();
if !tui_enabled {
info!(
before = before_count,
after = after_count,
only = ?filter.only,
except = ?filter.except,
"Applied tag filter"
);
}
}
if !manifest.mothership.uplinks.is_empty() {
if !tui_enabled {
info!(
count = manifest.mothership.uplinks.len(),
"Verifying uplinks"
);
}
if let Err(e) = verify_uplinks(&manifest.mothership.uplinks).await {
lifecycle.preflight_failed();
return Err(e.into());
}
if !tui_enabled {
info!("All uplinks verified");
}
lifecycle.preflight_complete();
} else {
lifecycle.preflight_skipped();
}
let flagship_config = &manifest.mothership.flagship;
let should_run_prelaunch = if flagship_config.enabled {
match run_flagship_coordination(flagship_config, &manifest, tui_enabled).await {
Ok(result) => {
lifecycle.election_complete();
result
}
Err(e) => {
lifecycle.election_failed();
return Err(e);
}
}
} else {
lifecycle.election_skipped();
true
};
if should_run_prelaunch && !manifest.mothership.prelaunch.is_empty() {
if !tui_enabled {
info!(
count = manifest.mothership.prelaunch.len(),
"Running prelaunch jobs"
);
}
if let Err(e) =
run_prelaunch(&manifest.mothership.prelaunch, &manifest.mothership.env).await
{
lifecycle.prelaunch_failed();
return Err(e.into());
}
if !tui_enabled {
info!("All prelaunch jobs completed");
}
lifecycle.prelaunch_complete();
} else {
lifecycle.prelaunch_skipped();
}
lifecycle.docking_skipped();
let fleet = Arc::new(Fleet::from_manifest(&manifest));
let shutdown = Arc::new(Notify::new());
let (http_shutdown_tx, http_shutdown_rx) = watch::channel(false);
let (metrics_shutdown_tx, metrics_shutdown_rx) = watch::channel(false);
#[cfg(feature = "tui")]
let (tui_shutdown_tx, tui_shutdown_rx) = watch::channel(false);
#[cfg(not(feature = "tui"))]
let (tui_shutdown_tx, _tui_shutdown_rx) = watch::channel(false);
let metrics_registry = Arc::new(MetricsRegistry::new());
set_global_metrics_registry(metrics_registry.clone());
let shutdown_signal = shutdown.clone();
let http_shutdown = http_shutdown_tx.clone();
let metrics_shutdown = metrics_shutdown_tx.clone();
let tui_shutdown = tui_shutdown_tx.clone();
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate()).expect("SIGTERM handler");
let mut sigint = signal(SignalKind::interrupt()).expect("SIGINT handler");
tokio::select! {
_ = sigterm.recv() => {
if !tui_enabled {
info!("Received SIGTERM, initiating shutdown");
}
}
_ = sigint.recv() => {
if !tui_enabled {
info!("Received SIGINT, initiating shutdown");
}
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.ok();
if !tui_enabled {
info!("Received SIGINT, initiating shutdown");
}
}
shutdown_signal.notify_one();
let _ = http_shutdown.send(true);
let _ = metrics_shutdown.send(true);
let _ = tui_shutdown.send(true);
});
if let Err(e) = fleet.launch().await {
lifecycle.launch_failed();
fleet.shutdown().await;
return Err(e);
}
lifecycle.launch_complete();
let http_handle = if let Some(http_exposure) = HttpExposure::from_manifest(&manifest) {
for bay in fleet.all_bays() {
if let Some(connector) = fleet.get_bay_connector(&bay.name).await {
http_exposure
.register_bay_connector(bay.name.clone(), connector)
.await;
debug!(bay = %bay.name, "Registered bay connector with HTTP layer");
}
}
Some(tokio::spawn(async move {
if let Err(e) = http_exposure.run(http_shutdown_rx).await {
tracing::error!(error = %e, "HTTP exposure layer failed");
}
}))
} else {
if !tui_enabled {
info!("No mothership.bind configured, HTTP exposure disabled");
}
None
};
let metrics_handle = if let Some(port) = manifest.mothership.metrics_port {
let metrics_fleet = fleet.clone();
let metrics_reg = metrics_registry.clone();
Some(tokio::spawn(async move {
let server = MetricsServer::new(port, metrics_fleet, metrics_reg);
server.run(metrics_shutdown_rx).await;
}))
} else {
None
};
let fleet_monitor = fleet.clone();
let shutdown_notify = shutdown.clone();
let monitor_handle = tokio::spawn(async move {
fleet_monitor.monitor(shutdown_notify).await;
});
#[cfg(feature = "tui")]
let tui_handle = if tui_enabled {
let tui_fleet = fleet.clone();
Some(tokio::spawn(async move {
match TuiApp::new(tui_fleet, tui_shutdown_rx).await {
Ok(mut app) => {
if let Err(e) = app.run().await {
eprintln!("TUI error: {}", e);
}
}
Err(e) => {
eprintln!("Failed to start TUI: {}", e);
}
}
}))
} else {
None
};
#[cfg(not(feature = "tui"))]
let tui_handle: Option<tokio::task::JoinHandle<()>> = None;
#[cfg(feature = "tui")]
if let Some(mut tui) = tui_handle {
tokio::select! {
_ = &mut tui => {
shutdown.notify_one();
}
_ = shutdown.notified() => {
let _ = tui_shutdown_tx.send(true);
let _ = tui.await;
}
}
} else {
shutdown.notified().await;
}
#[cfg(not(feature = "tui"))]
{
let _ = tui_handle; shutdown.notified().await;
}
lifecycle.drain_started();
monitor_handle.abort();
if let Some(handle) = http_handle {
let _ = http_shutdown_tx.send(true);
let _ = handle.await;
}
if let Some(handle) = metrics_handle {
let _ = metrics_shutdown_tx.send(true);
let _ = handle.await;
}
fleet.shutdown().await;
lifecycle.drain_complete();
if !tui_enabled {
info!(status = %lifecycle.status(), "Mothership landed");
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{fs, path::Path, time::Duration};
use super::{Commands, runs_process_supervision};
use tempfile::tempdir;
use tokio::time::{Instant, sleep};
async fn wait_for_pid_file(path: &Path) -> i32 {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if let Ok(contents) = fs::read_to_string(path)
&& let Ok(pid) = contents.trim().parse::<i32>()
&& pid > 0
{
return pid;
}
if Instant::now() >= deadline {
panic!("timed out waiting for pid file: {}", path.display());
}
sleep(Duration::from_millis(20)).await;
}
}
fn process_exists(pid: i32) -> bool {
let result = unsafe { libc::kill(pid, 0) };
if result == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
async fn wait_for_exit(pid: i32) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if !process_exists(pid) {
return;
}
if Instant::now() >= deadline {
panic!("process {} still running after shutdown", pid);
}
sleep(Duration::from_millis(20)).await;
}
}
#[test]
fn default_command_runs_supervision() {
assert!(runs_process_supervision(None));
}
#[test]
fn run_subcommand_runs_supervision() {
let cmd = Commands::Run {
config: None,
tui: false,
only: vec![],
except: vec![],
};
assert!(runs_process_supervision(Some(&cmd)));
}
#[test]
fn utility_commands_skip_supervision() {
let cmd = Commands::Chart { config: None };
assert!(!runs_process_supervision(Some(&cmd)));
}
#[tokio::test]
async fn run_fleet_cleans_up_started_ships_when_launch_fails() {
let temp = tempdir().expect("temp dir");
let pid_path = temp.path().join("ship-child.pid");
let manifest_path = temp.path().join("ship-manifest.toml");
let manifest = format!(
r#"
[[fleet.web]]
name = "app"
command = "sh"
args = ["-c", "echo $$ > \"{pid_path}\"; sleep 1000"]
critical = false
[[fleet.web]]
name = "broken"
command = "/definitely/missing-command"
critical = false
"#,
pid_path = pid_path.display()
);
fs::write(&manifest_path, manifest).expect("write manifest");
let result = super::run_fleet(&manifest_path, false, super::TagFilter::default()).await;
assert!(result.is_err(), "launch should fail");
let child_pid = wait_for_pid_file(&pid_path).await;
wait_for_exit(child_pid).await;
}
#[tokio::test]
async fn run_fleet_fails_when_tag_filter_removes_dependency() {
let temp = tempdir().expect("temp dir");
let manifest_path = temp.path().join("ship-manifest.toml");
let manifest = r#"
[[fleet.jobs]]
name = "migrate"
command = "sh"
args = ["-c", "exit 0"]
tags = ["jobs"]
[[fleet.web]]
name = "app"
command = "sh"
args = ["-c", "sleep 1"]
depends_on = ["migrate"]
tags = ["web"]
"#;
fs::write(&manifest_path, manifest).expect("write manifest");
let result = super::run_fleet(
&manifest_path,
false,
super::TagFilter {
only: vec!["web".to_string()],
except: vec![],
},
)
.await;
let error = result.expect_err("filtered dependency should fail");
assert!(
error.to_string().contains("depends on unknown vessel 'migrate'"),
"unexpected error: {error}"
);
}
}