#![allow(dead_code)]
#![allow(unused_imports)]
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::SystemTime;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use bzzz_core::{RunId, RunStatus, RuntimeKind};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunState {
pub run_id: String,
pub status: RunStatus,
pub runtime_kind: RuntimeKind,
pub started_at: SystemTime,
pub pid: Option<u32>,
pub spec_path: PathBuf,
pub working_dir: PathBuf,
pub runtime_handle: String,
}
impl RunState {
pub fn new(
run_id: RunId,
runtime_kind: RuntimeKind,
spec_path: PathBuf,
working_dir: PathBuf,
) -> Self {
RunState {
run_id: run_id.as_str().to_string(),
status: RunStatus::Running,
runtime_kind,
started_at: SystemTime::now(),
pid: None,
spec_path,
working_dir,
runtime_handle: String::new(),
}
}
pub fn with_pid(mut self, pid: u32) -> Self {
self.pid = Some(pid);
self.runtime_handle = format!("pid:{}", pid);
self
}
pub fn with_pid_opt(mut self, pid: Option<u32>) -> Self {
if let Some(p) = pid {
self.pid = Some(p);
self.runtime_handle = format!("pid:{}", p);
}
self
}
pub fn id(&self) -> &str {
&self.run_id
}
pub fn is_process_alive(&self) -> bool {
match self.pid {
Some(pid) => is_process_running(pid),
None => false,
}
}
pub fn kill_process(&self, force: bool) -> Result<()> {
match self.pid {
Some(pid) => kill_process(pid, force),
None => anyhow::bail!("No PID to kill"),
}
}
}
pub struct RunRegistry {
state_dir: PathBuf,
}
impl RunRegistry {
pub fn new(state_dir: PathBuf) -> Self {
RunRegistry { state_dir }
}
pub fn default() -> Self {
let state_dir = std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(".bzzz")
.join("runs");
RunRegistry { state_dir }
}
fn ensure_dir(&self) -> Result<()> {
std::fs::create_dir_all(&self.state_dir)
.with_context(|| format!("Failed to create state dir: {:?}", self.state_dir))?;
Ok(())
}
fn state_file(&self, run_id: &str) -> PathBuf {
self.state_dir.join(format!("{}.json", run_id))
}
pub fn register(&self, state: &RunState) -> Result<()> {
self.ensure_dir()?;
let path = self.state_file(&state.run_id);
let content =
serde_json::to_string_pretty(state).with_context(|| "Failed to serialize run state")?;
std::fs::write(&path, content)
.with_context(|| format!("Failed to write state file: {:?}", path))?;
Ok(())
}
pub fn get(&self, run_id: &str) -> Result<Option<RunState>> {
let path = self.state_file(run_id);
if !path.exists() {
return Ok(None);
}
let content = std::fs::read_to_string(&path)
.with_context(|| format!("Failed to read state file: {:?}", path))?;
let state: RunState =
serde_json::from_str(&content).with_context(|| "Failed to parse run state")?;
Ok(Some(state))
}
pub fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
let mut state = self
.get(run_id)?
.ok_or_else(|| anyhow::anyhow!("Run not found: {}", run_id))?;
state.status = status;
self.register(&state)
}
pub fn mark_stopped(&self, run_id: &str, reason: &str) -> Result<()> {
let mut state = self
.get(run_id)?
.ok_or_else(|| anyhow::anyhow!("Run not found: {}", run_id))?;
state.status = RunStatus::Cancelled;
self.register(&state)?;
let reason_file = self.state_dir.join(format!("{}.reason", run_id));
std::fs::write(&reason_file, reason).ok();
Ok(())
}
pub fn list(&self, filter_status: Option<RunStatus>) -> Result<Vec<RunState>> {
if !self.state_dir.exists() {
return Ok(Vec::new());
}
let mut runs = Vec::new();
for entry in std::fs::read_dir(&self.state_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map(|e| e == "json").unwrap_or(false) {
if let Ok(content) = std::fs::read_to_string(&path) {
if let Ok(state) = serde_json::from_str::<RunState>(&content) {
if let Some(filter) = filter_status {
if state.status != filter {
continue;
}
}
runs.push(state);
}
}
}
}
runs.sort_by(|a, b| b.started_at.cmp(&a.started_at));
Ok(runs)
}
pub fn cleanup_completed(&self, max_age_hours: u64) -> Result<usize> {
let runs = self.list(None)?;
let mut cleaned = 0;
let cutoff = std::time::Duration::from_secs(max_age_hours * 3600);
let now = SystemTime::now();
for run in runs {
if run.status == RunStatus::Completed
|| run.status == RunStatus::Failed
|| run.status == RunStatus::Cancelled
{
if let Ok(elapsed) = now.duration_since(run.started_at) {
if elapsed > cutoff {
let path = self.state_file(&run.run_id);
std::fs::remove_file(&path).ok();
cleaned += 1;
}
}
}
}
Ok(cleaned)
}
}
#[cfg(unix)]
fn is_process_running(pid: u32) -> bool {
Command::new("ps")
.arg("-p")
.arg(pid.to_string())
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
#[cfg(not(unix))]
fn is_process_running(_pid: u32) -> bool {
false
}
#[cfg(unix)]
fn kill_process(pid: u32, force: bool) -> Result<()> {
let signal = if force { "KILL" } else { "TERM" };
let status = Command::new("kill")
.arg(format!("-{}", signal))
.arg(pid.to_string())
.status()
.with_context(|| format!("Failed to execute kill command for pid {}", pid))?;
if !status.success() {
anyhow::bail!("kill command failed for pid {}", pid);
}
Ok(())
}
#[cfg(not(unix))]
fn kill_process(pid: u32, _force: bool) -> Result<()> {
anyhow::bail!("Process killing not supported on this platform");
}
#[cfg(test)]
mod tests {
use super::*;
fn uuid() -> String {
format!(
"{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
)
}
#[test]
fn test_run_state_creation() {
let run_id = RunId::new();
let state = RunState::new(
run_id.clone(),
RuntimeKind::Local,
PathBuf::from("agent.yaml"),
PathBuf::from("/tmp"),
);
assert_eq!(state.run_id, run_id.as_str());
assert_eq!(state.status, RunStatus::Running);
assert!(state.pid.is_none());
}
#[test]
fn test_run_state_with_pid() {
let run_id = RunId::new();
let state = RunState::new(
run_id,
RuntimeKind::Local,
PathBuf::from("agent.yaml"),
PathBuf::from("/tmp"),
)
.with_pid(12345);
assert_eq!(state.pid, Some(12345));
assert_eq!(state.runtime_handle, "pid:12345");
}
#[test]
fn test_registry_roundtrip() {
let temp_dir = std::env::temp_dir().join(format!("bzzz-registry-test-{}", uuid()));
std::fs::create_dir_all(&temp_dir).ok();
let registry = RunRegistry::new(temp_dir.clone());
let run_id = RunId::new();
let state = RunState::new(
run_id.clone(),
RuntimeKind::Local,
PathBuf::from("agent.yaml"),
PathBuf::from("/tmp"),
)
.with_pid(12345);
registry.register(&state).unwrap();
let retrieved = registry.get(run_id.as_str()).unwrap().unwrap();
assert_eq!(retrieved.run_id, state.run_id);
assert_eq!(retrieved.pid, state.pid);
std::fs::remove_dir_all(&temp_dir).ok();
}
#[test]
fn test_registry_list() {
let temp_dir =
std::env::temp_dir().join(format!("bzzz-registry-list-test-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir).ok();
let registry = RunRegistry::new(temp_dir.clone());
for i in 0..3 {
let run_id = RunId::new();
let state = RunState::new(
run_id,
RuntimeKind::Local,
PathBuf::from(format!("agent{}.yaml", i)),
PathBuf::from("/tmp"),
);
registry.register(&state).unwrap();
}
let runs = registry.list(None).unwrap();
assert_eq!(runs.len(), 3);
std::fs::remove_dir_all(&temp_dir).ok();
}
}