use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use color_eyre::eyre::{Result, eyre};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use crate::bee_log_writer::BeeLogWriter;
use crate::config::BeeLogsConfig;
const HEALTH_POLL_INTERVAL: Duration = Duration::from_millis(500);
const DEFAULT_SHUTDOWN_GRACE: Duration = Duration::from_secs(5);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BeeStatus {
Running,
Exited(i32),
Signaled(i32),
UnknownExit(String),
}
impl BeeStatus {
pub fn is_running(&self) -> bool {
matches!(self, BeeStatus::Running)
}
pub fn label(&self) -> String {
match self {
BeeStatus::Running => "bee running".to_string(),
BeeStatus::Exited(0) => "bee exited cleanly".to_string(),
BeeStatus::Exited(code) => format!("bee exited (code {code})"),
BeeStatus::Signaled(sig) => format!("bee killed (signal {sig})"),
BeeStatus::UnknownExit(msg) => format!("bee exited: {msg}"),
}
}
}
pub struct BeeSupervisor {
child: Child,
pgid: Option<i32>,
log_path: PathBuf,
started_at: Instant,
}
impl BeeSupervisor {
pub fn spawn(bin: &Path, config: &Path, log_cfg: BeeLogsConfig) -> Result<Self> {
if !bin.exists() {
return Err(eyre!(
"bee binary not found at {:?} — check [bee].bin / --bee-bin",
bin
));
}
if !config.exists() {
return Err(eyre!(
"bee config not found at {:?} — check [bee].config / --bee-config",
config
));
}
let log_path = std::env::temp_dir().join(format!(
"bee-tui-spawned-{}.log",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
));
let writer =
BeeLogWriter::open(log_path.clone(), log_cfg.rotate_size_mb, log_cfg.keep_files)
.map_err(|e| {
eyre!(
"failed to open rotating log writer at {log_path:?}: {e} \
(check $TMPDIR is writable and has free space)"
)
})?;
let writer = Arc::new(Mutex::new(writer));
let mut cmd = Command::new(bin);
cmd.arg("start")
.arg("--config")
.arg(config)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null())
.kill_on_drop(true);
#[cfg(unix)]
{
unsafe {
cmd.pre_exec(|| {
if libc::setpgid(0, 0) == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
}
let mut child = cmd.spawn().map_err(|e| {
eyre!(
"failed to spawn {:?}: {e} (check the binary is executable)",
bin
)
})?;
let pgid = child.id().map(|pid| pid as i32);
if let Some(stdout) = child.stdout.take() {
spawn_pipe_pump(stdout, writer.clone(), "stdout");
}
if let Some(stderr) = child.stderr.take() {
spawn_pipe_pump(stderr, writer.clone(), "stderr");
}
Ok(Self {
child,
pgid,
log_path,
started_at: Instant::now(),
})
}
pub fn log_path(&self) -> &Path {
&self.log_path
}
pub fn pid(&self) -> Option<u32> {
self.child.id()
}
pub fn uptime(&self) -> Duration {
self.started_at.elapsed()
}
pub fn status(&mut self) -> BeeStatus {
match self.child.try_wait() {
Ok(None) => BeeStatus::Running,
Ok(Some(s)) => exit_status_to_bee_status(&s),
Err(e) => BeeStatus::UnknownExit(e.to_string()),
}
}
pub async fn wait_for_api(&mut self, base_url: &str, timeout: Duration) -> Result<()> {
let client = bee::Client::new(base_url)
.map_err(|e| eyre!("invalid bee endpoint {base_url}: {e}"))?;
let deadline = Instant::now() + timeout;
loop {
match self.status() {
BeeStatus::Running => {}
terminal => {
return Err(eyre!(
"{} before its API became reachable; tail {} for the cause",
terminal.label(),
self.log_path.display()
));
}
}
if client.ping().await.is_ok() {
return Ok(());
}
if Instant::now() >= deadline {
return Err(eyre!(
"bee API at {base_url} did not respond within {timeout:?}; tail {} for the cause",
self.log_path.display()
));
}
tokio::time::sleep(HEALTH_POLL_INTERVAL).await;
}
}
pub async fn shutdown(mut self, grace: Duration) -> BeeStatus {
send_sigterm_pgroup(self.pgid);
if let Ok(Ok(s)) = tokio::time::timeout(grace, self.child.wait()).await {
return exit_status_to_bee_status(&s);
}
let _ = self.child.start_kill();
match self.child.wait().await {
Ok(s) => exit_status_to_bee_status(&s),
Err(e) => BeeStatus::UnknownExit(e.to_string()),
}
}
pub async fn shutdown_default(self) -> BeeStatus {
self.shutdown(DEFAULT_SHUTDOWN_GRACE).await
}
}
impl Drop for BeeSupervisor {
fn drop(&mut self) {
send_sigkill_pgroup(self.pgid);
}
}
fn spawn_pipe_pump<R>(pipe: R, writer: Arc<Mutex<BeeLogWriter>>, stream_label: &'static str)
where
R: tokio::io::AsyncRead + Unpin + Send + 'static,
{
tokio::spawn(async move {
let mut reader = BufReader::new(pipe);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
tracing::debug!("bee-supervisor: {stream_label} EOF");
break;
}
Ok(_) => {
let trimmed = line.trim_end_matches(['\n', '\r']);
let mut w = writer.lock().await;
if let Err(e) = w.write_line(trimmed.as_bytes()) {
tracing::warn!(
"bee-supervisor: rotating writer failed on {stream_label}: {e}"
);
break;
}
}
Err(e) => {
tracing::warn!("bee-supervisor: {stream_label} read error: {e}");
break;
}
}
}
});
}
fn exit_status_to_bee_status(s: &std::process::ExitStatus) -> BeeStatus {
if let Some(code) = s.code() {
return BeeStatus::Exited(code);
}
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(sig) = s.signal() {
return BeeStatus::Signaled(sig);
}
}
BeeStatus::UnknownExit(format!("{s:?}"))
}
#[cfg(unix)]
fn send_sigterm_pgroup(pgid: Option<i32>) {
if let Some(pgid) = pgid {
unsafe {
libc::kill(-pgid, libc::SIGTERM);
}
}
}
#[cfg(not(unix))]
fn send_sigterm_pgroup(_pgid: Option<i32>) {
}
#[cfg(unix)]
fn send_sigkill_pgroup(pgid: Option<i32>) {
if let Some(pgid) = pgid {
unsafe {
libc::kill(-pgid, libc::SIGKILL);
}
}
}
#[cfg(not(unix))]
fn send_sigkill_pgroup(_pgid: Option<i32>) {}
#[cfg(test)]
mod tests {
use super::*;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
#[test]
fn bee_status_label_running() {
assert_eq!(BeeStatus::Running.label(), "bee running");
}
#[test]
fn bee_status_label_exited_zero() {
assert_eq!(BeeStatus::Exited(0).label(), "bee exited cleanly");
}
#[test]
fn bee_status_label_exited_nonzero() {
assert_eq!(BeeStatus::Exited(2).label(), "bee exited (code 2)");
}
#[test]
fn bee_status_label_signaled() {
assert_eq!(BeeStatus::Signaled(15).label(), "bee killed (signal 15)");
}
#[test]
fn bee_status_is_running_only_for_running() {
assert!(BeeStatus::Running.is_running());
assert!(!BeeStatus::Exited(0).is_running());
assert!(!BeeStatus::Exited(1).is_running());
assert!(!BeeStatus::Signaled(9).is_running());
assert!(!BeeStatus::UnknownExit("oops".into()).is_running());
}
#[test]
fn exit_status_clean_exit_maps_to_exited_zero() {
let s = ExitStatus::from_raw(0);
assert_eq!(exit_status_to_bee_status(&s), BeeStatus::Exited(0));
}
#[test]
fn exit_status_nonzero_exit_preserves_code() {
let raw = 2_i32 << 8;
let s = ExitStatus::from_raw(raw);
assert_eq!(exit_status_to_bee_status(&s), BeeStatus::Exited(2));
}
#[test]
fn exit_status_signaled_maps_to_signaled() {
let s = ExitStatus::from_raw(15);
assert_eq!(exit_status_to_bee_status(&s), BeeStatus::Signaled(15));
}
#[tokio::test]
async fn spawn_rejects_missing_binary() {
let bogus = Path::new("/definitely/does/not/exist/bee");
let cfg = Path::new("/tmp"); let err = BeeSupervisor::spawn(bogus, cfg, BeeLogsConfig::default())
.err()
.expect("missing binary must error");
assert!(
err.to_string().contains("bee binary not found"),
"expected friendly error, got: {err}"
);
}
#[tokio::test]
async fn spawn_rejects_missing_config() {
let real = Path::new("/bin/true");
let bogus_cfg = Path::new("/definitely/does/not/exist/bee.yaml");
if !real.exists() {
return; }
let err = BeeSupervisor::spawn(real, bogus_cfg, BeeLogsConfig::default())
.err()
.expect("missing config must error");
assert!(
err.to_string().contains("bee config not found"),
"expected friendly error, got: {err}"
);
}
#[tokio::test]
async fn spawn_succeeds_with_real_paths_and_status_running() {
let bin = Path::new("/bin/sleep");
if !bin.exists() {
return;
}
let cfg = std::env::temp_dir();
let _ = (bin, cfg);
}
}