use std::collections::{HashMap, VecDeque};
use std::process::Stdio;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use breaker_machines::CircuitBreaker;
use state_machines::state_machine;
use crate::charter::{Bind, RouteConfig};
const MAX_LOG_LINES: usize = 1000;
const MAX_LOG_BYTES: usize = 1024 * 1024;
const MAX_LINE_LENGTH: usize = 4096;
const RESTART_FAILURE_THRESHOLD: usize = 5;
const RESTART_FAILURE_WINDOW_SECS: f64 = 60.0;
const RESTART_BACKOFF_SECS: f64 = 30.0;
const RESTART_SUCCESS_THRESHOLD: usize = 1;
const STARTUP_GRACE_SECS: u64 = 10;
fn spawn_log_forwarder<R>(
stream: Option<R>,
stream_name: &'static str,
ship_name: &str,
log_buffer: Arc<Mutex<LogBuffer>>,
) where
R: tokio::io::AsyncRead + Unpin + Send + 'static,
{
if let Some(reader) = stream {
let ship_name = ship_name.to_string();
tokio::spawn(async move {
let reader = BufReader::new(reader);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if !SUPPRESS_STDOUT.load(Ordering::SeqCst) {
info!(ship = %ship_name, stream = stream_name, "{}", line);
}
log_buffer.lock().await.push(stream_name, line);
}
});
}
}
const TERMINATE_TIMEOUT_SECS: u64 = 5;
static SUPPRESS_STDOUT: AtomicBool = AtomicBool::new(false);
pub fn set_suppress_stdout(suppress: bool) {
SUPPRESS_STDOUT.store(suppress, Ordering::SeqCst);
}
pub fn is_stdout_suppressed() -> bool {
SUPPRESS_STDOUT.load(Ordering::SeqCst)
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub stream: &'static str,
pub message: String,
}
#[derive(Debug)]
struct LogBuffer {
entries: VecDeque<LogEntry>,
total_bytes: usize,
}
impl LogBuffer {
fn new() -> Self {
Self {
entries: VecDeque::with_capacity(MAX_LOG_LINES),
total_bytes: 0,
}
}
fn push(&mut self, stream: &'static str, mut message: String) {
if message.len() > MAX_LINE_LENGTH {
message.truncate(MAX_LINE_LENGTH - 3);
message.push_str("...");
}
let msg_len = message.len();
while self.total_bytes + msg_len > MAX_LOG_BYTES && !self.entries.is_empty() {
if let Some(old) = self.entries.pop_front() {
self.total_bytes = self.total_bytes.saturating_sub(old.message.len());
}
}
while self.entries.len() >= MAX_LOG_LINES {
if let Some(old) = self.entries.pop_front() {
self.total_bytes = self.total_bytes.saturating_sub(old.message.len());
}
}
self.total_bytes += msg_len;
self.entries.push_back(LogEntry { stream, message });
}
fn recent(&self, limit: usize) -> Vec<LogEntry> {
self.entries.iter().rev().take(limit).cloned().collect()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ShipStatus {
#[default]
Pending,
Starting,
Running,
Unhealthy,
Backoff,
Stopped,
Failed,
}
state_machine! {
name: ShipLifecycle,
context: (),
dynamic: true,
initial: Pending,
states: [
Pending,
Starting,
Running,
Unhealthy,
Backoff,
Stopped,
Failed,
],
events {
start {
transition: { from: [Pending, Stopped, Failed, Backoff], to: Starting }
}
healthy {
transition: { from: [Starting, Unhealthy], to: Running }
}
unhealthy {
transition: { from: [Starting, Running], to: Unhealthy }
}
crash {
transition: { from: [Starting, Running, Unhealthy], to: Failed }
}
backoff {
transition: { from: [Failed, Starting, Running, Unhealthy], to: Backoff }
}
stop {
transition: { from: [Pending, Starting, Running, Unhealthy, Backoff, Failed], to: Stopped }
}
}
}
#[derive(Debug)]
struct ShipState {
machine: DynamicShipLifecycle,
status: ShipStatus,
backoff_until: Option<Instant>,
}
impl ShipState {
fn new() -> Self {
let machine = DynamicShipLifecycle::new(());
let status = status_from_state(machine.current_state());
Self {
machine,
status,
backoff_until: None,
}
}
fn refresh_status(&mut self) {
self.status = status_from_state(self.machine.current_state());
}
}
fn status_from_state(state: &str) -> ShipStatus {
match state {
"Pending" => ShipStatus::Pending,
"Starting" => ShipStatus::Starting,
"Running" => ShipStatus::Running,
"Unhealthy" => ShipStatus::Unhealthy,
"Backoff" => ShipStatus::Backoff,
"Stopped" => ShipStatus::Stopped,
"Failed" => ShipStatus::Failed,
_ => ShipStatus::Pending,
}
}
#[derive(Debug, Clone)]
pub struct ShipSnapshot {
name: String,
group: String,
command: String,
status: ShipStatus,
pid: Option<u32>,
healthcheck: Option<String>,
restart_count: u32,
critical: bool,
oneshot: bool,
routes: Vec<String>,
}
impl ShipSnapshot {
pub fn name(&self) -> &str {
&self.name
}
pub fn group(&self) -> &str {
&self.group
}
pub fn command(&self) -> &str {
&self.command
}
pub fn status(&self) -> ShipStatus {
self.status
}
pub fn pid(&self) -> Option<u32> {
self.pid
}
pub fn healthcheck(&self) -> Option<&str> {
self.healthcheck.as_deref()
}
pub fn restart_count(&self) -> u32 {
self.restart_count
}
pub fn is_critical(&self) -> bool {
self.critical
}
pub fn is_oneshot(&self) -> bool {
self.oneshot
}
pub fn routes(&self) -> &[String] {
&self.routes
}
}
#[derive(Default)]
pub struct ShipBuilder {
name: String,
group: String,
command: String,
args: Vec<String>,
env: HashMap<String, String>,
bind: Option<Bind>,
healthcheck: Option<String>,
depends_on: Vec<String>,
routes: Vec<RouteConfig>,
critical: bool,
oneshot: bool,
}
impl ShipBuilder {
pub fn new(name: impl Into<String>, command: impl Into<String>) -> Self {
Self {
name: name.into(),
command: command.into(),
critical: true, ..Default::default()
}
}
pub fn group(mut self, group: impl Into<String>) -> Self {
self.group = group.into();
self
}
pub fn args(mut self, args: Vec<String>) -> Self {
self.args = args;
self
}
pub fn env(mut self, env: HashMap<String, String>) -> Self {
self.env = env;
self
}
pub fn bind(mut self, bind: Option<Bind>) -> Self {
self.bind = bind;
self
}
pub fn healthcheck(mut self, healthcheck: Option<String>) -> Self {
self.healthcheck = healthcheck;
self
}
pub fn depends_on(mut self, depends_on: Vec<String>) -> Self {
self.depends_on = depends_on;
self
}
pub fn routes(mut self, routes: Vec<RouteConfig>) -> Self {
self.routes = routes;
self
}
pub fn critical(mut self, critical: bool) -> Self {
self.critical = critical;
self
}
pub fn oneshot(mut self, oneshot: bool) -> Self {
self.oneshot = oneshot;
self
}
pub fn build(self) -> Ship {
let backoff_delay = Duration::from_secs_f64(RESTART_BACKOFF_SECS);
let name = self.name;
let breaker = CircuitBreaker::builder(format!("ship:{}", name))
.failure_threshold(RESTART_FAILURE_THRESHOLD)
.failure_window_secs(RESTART_FAILURE_WINDOW_SECS)
.half_open_timeout_secs(RESTART_BACKOFF_SECS)
.success_threshold(RESTART_SUCCESS_THRESHOLD)
.build();
Ship {
name,
group: self.group,
command: self.command,
args: self.args,
env: self.env,
bind: self.bind,
healthcheck: self.healthcheck,
depends_on: self.depends_on,
routes: self.routes,
critical: self.critical,
oneshot: self.oneshot,
state: Arc::new(Mutex::new(ShipState::new())),
child: Arc::new(Mutex::new(None)),
process_meta: Arc::new(Mutex::new(ProcessMeta::default())),
last_health_check: Arc::new(Mutex::new(None)),
restart_count: Arc::new(Mutex::new(0)),
log_buffer: Arc::new(Mutex::new(LogBuffer::new())),
breaker: Arc::new(Mutex::new(breaker)),
backoff_delay,
memory_sampler: Arc::new(Mutex::new(None)),
memory_metrics: Arc::new(Mutex::new(None)),
}
}
}
#[derive(Debug, Default)]
struct ProcessMeta {
pgid: Option<i32>,
launched_at: Option<Instant>,
}
pub struct Ship {
pub name: String,
pub group: String,
pub command: String,
pub args: Vec<String>,
pub env: HashMap<String, String>,
pub bind: Option<Bind>,
pub healthcheck: Option<String>,
pub depends_on: Vec<String>,
pub routes: Vec<RouteConfig>,
pub critical: bool,
pub oneshot: bool,
state: Arc<Mutex<ShipState>>,
child: Arc<Mutex<Option<Child>>>,
process_meta: Arc<Mutex<ProcessMeta>>,
last_health_check: Arc<Mutex<Option<Instant>>>,
restart_count: Arc<Mutex<u32>>,
log_buffer: Arc<Mutex<LogBuffer>>,
breaker: Arc<Mutex<CircuitBreaker>>,
backoff_delay: Duration,
memory_sampler: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
memory_metrics: Arc<Mutex<Option<crate::monitoring::MemorySample>>>,
}
impl Ship {
pub fn display_name(&self) -> &str {
&self.name
}
async fn transition(&self, event: ShipLifecycleEvent) {
let mut state = self.state.lock().await;
if state.machine.handle(event).is_ok() {
state.refresh_status();
}
}
pub async fn status(&self) -> ShipStatus {
self.state.lock().await.status
}
pub async fn restart_count(&self) -> u32 {
*self.restart_count.lock().await
}
pub async fn increment_restart(&self) {
let mut count = self.restart_count.lock().await;
*count += 1;
}
pub async fn launch(&self) -> anyhow::Result<()> {
let name = self.display_name();
if self.is_running().await {
warn!(ship = name, "Ship already running, skipping launch");
return Ok(());
}
info!(ship = name, command = %self.command, "Launching ship");
self.transition(ShipLifecycleEvent::Start).await;
let mut cmd = Command::new(&self.command);
let mothership_pid = std::process::id();
let socket_dir = std::env::var("MS_SOCKET_DIR").unwrap_or_else(|_| {
std::env::var("XDG_RUNTIME_DIR")
.unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().to_string())
+ "/mothership"
});
cmd.args(&self.args)
.envs(&self.env)
.env("NO_COLOR", "1")
.env("MS_PID", mothership_pid.to_string())
.env("MS_SHIP", &self.name)
.env("MS_SOCKET_DIR", socket_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let parent_pid = std::process::id() as i32;
unsafe {
cmd.pre_exec(move || {
if libc::setpgid(0, 0) != 0 {
}
let pgid = libc::getpid(); super::parent_death::setup_parent_death_signal_preexec(
parent_pid,
pgid,
libc::SIGTERM,
);
Ok(())
});
}
let mut child = cmd.spawn()?;
let pid = child.id();
spawn_log_forwarder(child.stdout.take(), "stdout", name, self.log_buffer.clone());
spawn_log_forwarder(child.stderr.take(), "stderr", name, self.log_buffer.clone());
*self.child.lock().await = Some(child);
{
let mut meta = self.process_meta.lock().await;
meta.pgid = pid.map(|p| p as i32);
meta.launched_at = Some(Instant::now());
}
if let Some(p) = pid {
info!(ship = name, pid = p, "Ship launched");
} else {
info!(ship = name, "Ship launched");
}
if let Some(p) = pid {
let sampler = self.spawn_memory_sampler(p);
*self.memory_sampler.lock().await = Some(sampler);
}
Ok(())
}
pub async fn in_grace_period(&self) -> bool {
let meta = self.process_meta.lock().await;
match meta.launched_at {
Some(launched_at) => launched_at.elapsed() < Duration::from_secs(STARTUP_GRACE_SECS),
None => false,
}
}
pub async fn is_running(&self) -> bool {
let mut child_guard = self.child.lock().await;
if let Some(child) = child_guard.as_mut() {
match child.try_wait() {
Ok(Some(_)) => false,
Ok(None) => true,
Err(_) => false,
}
} else {
false
}
}
pub async fn wait(&self) -> anyhow::Result<i32> {
let mut child_guard = self.child.lock().await;
if let Some(child) = child_guard.as_mut() {
let status = child.wait().await?;
let code = status.code().unwrap_or(-1);
if code == 0 {
self.transition(ShipLifecycleEvent::Stop).await;
} else {
self.transition(ShipLifecycleEvent::Crash).await;
}
Ok(code)
} else {
Ok(-1)
}
}
pub async fn terminate(&self) -> anyhow::Result<()> {
let name = self.display_name();
info!(ship = name, "Terminating ship");
let pgid = self.process_meta.lock().await.pgid;
let mut child_guard = self.child.lock().await;
if let Some(child) = child_guard.as_mut() {
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
if let Some(pgid) = pgid {
debug!(ship = name, pgid = pgid, "Sending SIGTERM to process group");
if signal::killpg(Pid::from_raw(pgid), Signal::SIGTERM).is_err()
&& let Some(pid) = child.id()
{
let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
}
} else if let Some(pid) = child.id() {
let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
}
let timeout = Duration::from_secs(TERMINATE_TIMEOUT_SECS);
tokio::select! {
_ = tokio::time::sleep(timeout) => {
if let Some(pgid) = pgid {
debug!(ship = name, pgid = pgid, "Sending SIGKILL to process group");
if signal::killpg(Pid::from_raw(pgid), Signal::SIGKILL).is_err()
&& let Some(pid) = child.id() {
let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL);
}
}
let _ = child.kill().await;
}
_ = child.wait() => {
debug!(ship = name, "Ship exited gracefully");
}
}
}
{
let mut meta = self.process_meta.lock().await;
meta.pgid = None;
meta.launched_at = None;
}
if let Some(handle) = self.memory_sampler.lock().await.take() {
handle.abort();
}
*self.memory_metrics.lock().await = None;
self.transition(ShipLifecycleEvent::Stop).await;
Ok(())
}
pub async fn pid(&self) -> Option<u32> {
let child_guard = self.child.lock().await;
if let Some(child) = child_guard.as_ref() {
child.id()
} else {
None
}
}
fn spawn_memory_sampler(&self, pid: u32) -> tokio::task::JoinHandle<()> {
let metrics = self.memory_metrics.clone();
let name = self.name.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
match crate::monitoring::sample_process_memory(pid) {
Ok(sample) => {
*metrics.lock().await = Some(sample);
}
Err(crate::monitoring::MemoryError::ProcessNotFound(_)) => {
break;
}
Err(e) => {
warn!(ship = %name, pid = pid, error = %e, "Memory sampling failed");
}
}
}
})
}
pub async fn memory_sample(&self) -> Option<crate::monitoring::MemorySample> {
self.memory_metrics.lock().await.clone()
}
pub fn healthcheck_url(&self) -> Option<String> {
let endpoint = self.healthcheck.as_ref()?;
if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
return Some(endpoint.clone());
}
let base = self.bind.as_ref()?.healthcheck_base();
Some(format!("{}{}", base, endpoint))
}
pub async fn mark_healthy(&self) {
{
let mut state = self.state.lock().await;
if state.machine.handle(ShipLifecycleEvent::Healthy).is_ok() {
state.refresh_status();
}
}
*self.last_health_check.lock().await = Some(Instant::now());
}
pub async fn mark_unhealthy(&self) {
self.transition(ShipLifecycleEvent::Unhealthy).await;
}
pub async fn mark_failed(&self) {
self.transition(ShipLifecycleEvent::Crash).await;
}
pub async fn record_health_success(&self) {
let breaker = self.breaker.lock().await;
breaker.record_success(0.0);
}
pub async fn record_health_failure(&self) -> bool {
let mut breaker = self.breaker.lock().await;
breaker.record_failure_and_maybe_trip(0.0);
breaker.is_open()
}
pub async fn record_crash(&self) -> bool {
let mut breaker = self.breaker.lock().await;
breaker.record_failure_and_maybe_trip(0.0);
breaker.is_open()
}
pub async fn reset_breaker(&self) {
let mut breaker = self.breaker.lock().await;
breaker.reset();
}
pub async fn enter_backoff(&self) {
let until = Instant::now() + self.backoff_delay;
let mut state = self.state.lock().await;
if state.machine.handle(ShipLifecycleEvent::Backoff).is_ok() {
state.refresh_status();
}
state.backoff_until = Some(until);
}
pub async fn clear_backoff(&self) {
let mut state = self.state.lock().await;
state.backoff_until = None;
}
pub async fn is_backing_off(&self) -> bool {
self.state.lock().await.status == ShipStatus::Backoff
}
pub async fn backoff_expired(&self) -> bool {
let state = self.state.lock().await;
match state.backoff_until {
Some(until) => Instant::now() >= until,
None => true,
}
}
pub async fn logs(&self, limit: usize) -> Vec<LogEntry> {
self.log_buffer.lock().await.recent(limit)
}
pub async fn snapshot(&self) -> ShipSnapshot {
ShipSnapshot {
name: self.name.clone(),
group: self.group.clone(),
command: self.command.clone(),
status: self.status().await,
pid: self.pid().await,
healthcheck: self.healthcheck_url(),
restart_count: *self.restart_count.lock().await,
critical: self.critical,
oneshot: self.oneshot,
routes: self.routes.iter().map(|r| r.to_string()).collect(),
}
}
}
#[cfg(all(test, unix))]
mod tests {
use std::{collections::HashMap, fs, path::Path, time::Duration};
use tempfile::tempdir;
use tokio::time::{Instant, sleep};
use super::ShipBuilder;
async fn wait_for_pid_file(path: &Path) -> i32 {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if let Ok(contents) = fs::read_to_string(path)
&& let Ok(pid) = contents.trim().parse::<i32>()
&& pid > 0
{
return pid;
}
if Instant::now() >= deadline {
panic!("timed out waiting for pid file: {}", path.display());
}
sleep(Duration::from_millis(20)).await;
}
}
fn process_exists(pid: i32) -> bool {
let result = unsafe { libc::kill(pid, 0) };
if result == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
async fn wait_for_exit(pid: i32) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if !process_exists(pid) {
return;
}
if Instant::now() >= deadline {
panic!("process {} still running after shutdown", pid);
}
sleep(Duration::from_millis(20)).await;
}
}
#[tokio::test]
async fn terminate_kills_process_group() {
let temp = tempdir().expect("temp dir");
let pid_path = temp.path().join("ship-child.pid");
let mut env = HashMap::new();
env.insert(
"PID_FILE".to_string(),
pid_path.to_string_lossy().to_string(),
);
let script = "sleep 1000 & echo $! > \"$PID_FILE\"; wait";
let ship = ShipBuilder::new("test-ship", "sh")
.args(vec!["-c".to_string(), script.to_string()])
.env(env)
.critical(false)
.build();
ship.launch().await.expect("launch ship");
let child_pid = wait_for_pid_file(&pid_path).await;
ship.terminate().await.expect("terminate ship");
assert!(!ship.is_running().await, "ship still running");
wait_for_exit(child_pid).await;
}
}