use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use crate::charter::RouteConfig;
use crate::docking::DockingConnector;
use super::ship::is_stdout_suppressed;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BayStatus {
#[default]
Pending,
Starting,
Docking,
Docked,
Failed,
Stopped,
}
pub struct Bay {
pub name: String,
pub bay_type: String,
pub command: String,
pub args: Vec<String>,
pub env: HashMap<String, String>,
pub depends_on: Vec<String>,
pub routes: Vec<RouteConfig>,
pub critical: bool,
pub config: HashMap<String, String>,
socket_path: PathBuf,
status: Arc<Mutex<BayStatus>>,
child: Arc<Mutex<Option<Child>>>,
connector: Arc<Mutex<Option<Arc<DockingConnector>>>>,
}
impl Bay {
#[allow(clippy::too_many_arguments)]
pub fn new(
name: String,
bay_type: String,
command: String,
args: Vec<String>,
env: HashMap<String, String>,
depends_on: Vec<String>,
routes: Vec<RouteConfig>,
critical: bool,
config: HashMap<String, String>,
) -> Self {
let socket_dir = std::env::var("MS_SOCKET_DIR").unwrap_or_else(|_| {
std::env::var("XDG_RUNTIME_DIR")
.map(|d| format!("{}/mothership", d))
.unwrap_or_else(|_| "/tmp/mothership".to_string())
});
let socket_path = PathBuf::from(socket_dir).join(format!("{}.sock", name));
Self {
name,
bay_type,
command,
args,
env,
depends_on,
routes,
critical,
config,
socket_path,
status: Arc::new(Mutex::new(BayStatus::Pending)),
child: Arc::new(Mutex::new(None)),
connector: Arc::new(Mutex::new(None)),
}
}
pub async fn status(&self) -> BayStatus {
*self.status.lock().await
}
pub async fn connector(&self) -> Option<Arc<DockingConnector>> {
self.connector.lock().await.clone()
}
pub async fn launch(&self) -> anyhow::Result<()> {
info!(bay = %self.name, bay_type = %self.bay_type, "Launching bay");
*self.status.lock().await = BayStatus::Starting;
if let Some(parent) = self.socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let _ = std::fs::remove_file(&self.socket_path);
let mut cmd = Command::new(&self.command);
let mothership_pid = std::process::id();
let socket_dir = self
.socket_path
.parent()
.unwrap()
.to_string_lossy()
.to_string();
cmd.args(&self.args)
.envs(&self.env)
.env("NO_COLOR", "1")
.env("MS_PID", mothership_pid.to_string())
.env("MS_SHIP", &self.name)
.env("MS_SOCKET_DIR", &socket_dir)
.env(
"MS_SOCKET_PATH",
self.socket_path.to_string_lossy().to_string(),
)
.env("MS_BAY_TYPE", &self.bay_type)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true);
let parent_pid = std::process::id() as i32;
unsafe {
cmd.pre_exec(move || {
let _ = libc::setpgid(0, 0);
let pgid = libc::getpid(); super::parent_death::setup_parent_death_signal_preexec(
parent_pid,
pgid,
libc::SIGTERM,
);
Ok(())
});
}
let mut child = cmd.spawn()?;
let pid = child.id();
let name = self.name.clone();
if let Some(stdout) = child.stdout.take() {
let name = name.clone();
tokio::spawn(async move {
let reader = BufReader::new(stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if !is_stdout_suppressed() {
if line.starts_with('{') {
println!("{}", line);
} else {
println!("[{}] {}", name, line);
}
}
}
});
}
if let Some(stderr) = child.stderr.take() {
let name = name.clone();
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if !is_stdout_suppressed() {
if line.starts_with('{') {
eprintln!("{}", line);
} else {
eprintln!("[{}] {}", name, line);
}
}
}
});
}
*self.child.lock().await = Some(child);
info!(bay = %self.name, pid = ?pid, "Bay process launched");
*self.status.lock().await = BayStatus::Docking;
self.establish_docking().await?;
Ok(())
}
async fn establish_docking(&self) -> anyhow::Result<()> {
info!(bay = %self.name, socket = %self.socket_path.display(), "Waiting for docking socket");
let timeout = Duration::from_secs(30);
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if self.socket_path.exists() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
if !self.is_running().await {
*self.status.lock().await = BayStatus::Failed;
anyhow::bail!("Bay process died before creating socket");
}
}
if !self.socket_path.exists() {
*self.status.lock().await = BayStatus::Failed;
anyhow::bail!("Timeout waiting for bay socket");
}
tokio::time::sleep(Duration::from_millis(100)).await;
match DockingConnector::connect(&self.name, self.socket_path.clone(), self.config.clone())
.await
{
Ok(connector) => {
info!(bay = %self.name, "Docking established");
*self.connector.lock().await = Some(Arc::new(connector));
*self.status.lock().await = BayStatus::Docked;
Ok(())
}
Err(e) => {
error!(bay = %self.name, error = %e, "Failed to establish docking");
*self.status.lock().await = BayStatus::Failed;
Err(e)
}
}
}
pub async fn is_running(&self) -> bool {
let mut child_guard = self.child.lock().await;
if let Some(child) = child_guard.as_mut() {
match child.try_wait() {
Ok(Some(_)) => false,
Ok(None) => true,
Err(_) => false,
}
} else {
false
}
}
pub async fn is_docked(&self) -> bool {
*self.status.lock().await == BayStatus::Docked
}
pub async fn terminate(&self) -> anyhow::Result<()> {
info!(bay = %self.name, "Terminating bay");
*self.connector.lock().await = None;
let mut child_guard = self.child.lock().await;
if let Some(child) = child_guard.as_mut() {
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
if let Some(pid) = child.id() {
let pgid = pid as i32;
if signal::killpg(Pid::from_raw(pgid), Signal::SIGTERM).is_err() {
let _ = signal::kill(Pid::from_raw(pgid), Signal::SIGTERM);
}
}
let timeout = Duration::from_secs(5);
tokio::select! {
_ = tokio::time::sleep(timeout) => {
if let Some(pid) = child.id() {
let pgid = pid as i32;
if signal::killpg(Pid::from_raw(pgid), Signal::SIGKILL).is_err() {
let _ = signal::kill(Pid::from_raw(pgid), Signal::SIGKILL);
}
}
let _ = child.kill().await;
}
_ = child.wait() => {
debug!(bay = %self.name, "Bay exited gracefully");
}
}
}
let _ = std::fs::remove_file(&self.socket_path);
*self.status.lock().await = BayStatus::Stopped;
Ok(())
}
pub async fn pid(&self) -> Option<u32> {
let child_guard = self.child.lock().await;
child_guard.as_ref().and_then(|c| c.id())
}
pub fn socket_path(&self) -> &PathBuf {
&self.socket_path
}
}
impl std::fmt::Debug for Bay {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bay")
.field("name", &self.name)
.field("bay_type", &self.bay_type)
.field("socket_path", &self.socket_path)
.finish()
}
}
#[cfg(all(test, unix))]
mod tests {
use std::{collections::HashMap, fs, path::Path, process::Stdio, time::Duration};
use tempfile::tempdir;
use tokio::process::Command;
use tokio::time::{Instant, sleep};
use super::Bay;
async fn wait_for_pid_file(path: &Path) -> i32 {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if let Ok(contents) = fs::read_to_string(path)
&& let Ok(pid) = contents.trim().parse::<i32>()
&& pid > 0
{
return pid;
}
if Instant::now() >= deadline {
panic!("timed out waiting for pid file: {}", path.display());
}
sleep(Duration::from_millis(20)).await;
}
}
fn process_exists(pid: i32) -> bool {
let result = unsafe { libc::kill(pid, 0) };
if result == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
async fn wait_for_exit(pid: i32) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if !process_exists(pid) {
return;
}
if Instant::now() >= deadline {
panic!("process {} still running after shutdown", pid);
}
sleep(Duration::from_millis(20)).await;
}
}
#[tokio::test]
async fn terminate_kills_process_group_and_cleans_socket() {
let temp = tempdir().expect("temp dir");
let pid_path = temp.path().join("bay-child.pid");
let socket_path = temp.path().join("bay.sock");
let mut bay = Bay::new(
"test-bay".to_string(),
"test".to_string(),
"sh".to_string(),
vec![],
HashMap::new(),
vec![],
vec![],
false,
HashMap::new(),
);
bay.socket_path = socket_path.clone();
fs::write(&socket_path, b"stub").expect("create socket stub");
let mut cmd = Command::new("sh");
cmd.args(["-c", "sleep 1000 & echo $! > \"$PID_FILE\"; wait"])
.env("PID_FILE", pid_path.to_string_lossy().to_string())
.stdout(Stdio::null())
.stderr(Stdio::null())
.kill_on_drop(true);
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
let child = cmd.spawn().expect("spawn bay child");
*bay.child.lock().await = Some(child);
let child_pid = wait_for_pid_file(&pid_path).await;
bay.terminate().await.expect("terminate bay");
assert!(!socket_path.exists(), "socket file not removed");
wait_for_exit(child_pid).await;
}
}