use crate::config::ProcessConfig;
use crate::error::{Error, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::process::Stdio;
use tokio::fs::File;
use tokio::process::{Child, Command};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
pub type ProcessId = Uuid;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessState {
Starting,
Online,
Stopping,
Stopped,
Errored,
Restarting,
}
impl std::fmt::Display for ProcessState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProcessState::Starting => write!(f, "starting"),
ProcessState::Online => write!(f, "online"),
ProcessState::Stopping => write!(f, "stopping"),
ProcessState::Stopped => write!(f, "stopped"),
ProcessState::Errored => write!(f, "errored"),
ProcessState::Restarting => write!(f, "restarting"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessStatus {
pub id: ProcessId,
pub name: String,
pub state: ProcessState,
pub pid: Option<u32>,
pub uptime: Option<DateTime<Utc>>,
pub restarts: u32,
pub cpu_usage: f32,
pub memory_usage: u64,
pub exit_code: Option<i32>,
pub error: Option<String>,
pub namespace: String,
pub instance: Option<u32>,
pub assigned_port: Option<u16>,
}
#[derive(Debug)]
pub struct Process {
pub id: ProcessId,
pub config: ProcessConfig,
pub state: ProcessState,
pub child: Option<Child>,
pub started_at: Option<DateTime<Utc>>,
pub restarts: u32,
pub exit_code: Option<i32>,
pub error: Option<String>,
pub instance: Option<u32>,
pub assigned_port: Option<u16>,
pub stored_pid: Option<u32>,
pub monitoring: ProcessMonitoring,
}
#[derive(Debug, Default)]
pub struct ProcessMonitoring {
pub cpu_usage: f32,
pub memory_usage: u64,
pub last_update: Option<DateTime<Utc>>,
}
impl Process {
pub fn new(config: ProcessConfig) -> Self {
Self {
id: Uuid::new_v4(),
config,
state: ProcessState::Stopped,
child: None,
started_at: None,
restarts: 0,
exit_code: None,
error: None,
instance: None,
assigned_port: None,
stored_pid: None,
monitoring: ProcessMonitoring::default(),
}
}
pub fn status(&self) -> ProcessStatus {
ProcessStatus {
id: self.id,
name: self.config.name.clone(),
state: self.state,
pid: self.child.as_ref().and_then(|c| c.id()).or(self.stored_pid),
uptime: self.started_at,
restarts: self.restarts,
cpu_usage: self.monitoring.cpu_usage,
memory_usage: self.monitoring.memory_usage,
exit_code: self.exit_code,
error: self.error.clone(),
namespace: self.config.namespace.clone(),
instance: self.instance,
assigned_port: self.assigned_port,
}
}
pub fn is_running(&self) -> bool {
matches!(self.state, ProcessState::Online | ProcessState::Starting)
}
pub fn set_state(&mut self, state: ProcessState) {
self.state = state;
if state == ProcessState::Online && self.started_at.is_none() {
self.started_at = Some(Utc::now());
}
}
pub async fn start(&mut self) -> Result<()> {
self.start_with_logs(None, None).await
}
pub async fn start_with_logs(
&mut self,
out_log: Option<PathBuf>,
err_log: Option<PathBuf>,
) -> Result<()> {
if self.is_running() {
return Err(Error::ProcessAlreadyRunning(self.config.name.clone()));
}
info!("Starting process: {}", self.config.name);
self.set_state(ProcessState::Starting);
let mut cmd = Command::new(&self.config.script);
if !self.config.args.is_empty() {
cmd.args(&self.config.args);
}
if let Some(cwd) = &self.config.cwd {
cmd.current_dir(cwd);
}
if !self.config.env.is_empty() {
for (key, value) in &self.config.env {
cmd.env(key, value);
}
}
if let Some(out_path) = out_log {
let stdout_file = File::create(&out_path)
.await
.map_err(|e| Error::config(format!("Failed to create stdout log file: {}", e)))?;
cmd.stdout(stdout_file.into_std().await);
debug!("Redirecting stdout to: {:?}", out_path);
} else {
cmd.stdout(Stdio::piped());
}
if let Some(err_path) = err_log {
let stderr_file = File::create(&err_path)
.await
.map_err(|e| Error::config(format!("Failed to create stderr log file: {}", e)))?;
cmd.stderr(stderr_file.into_std().await);
debug!("Redirecting stderr to: {:?}", err_path);
} else {
cmd.stderr(Stdio::piped());
}
cmd.stdin(Stdio::null());
#[cfg(unix)]
{
#[allow(unused_imports)]
use std::os::unix::process::CommandExt;
cmd.process_group(0); }
#[cfg(windows)]
{
use std::os::windows::process::CommandExt;
cmd.creation_flags(0x00000008); }
match cmd.spawn() {
Ok(child) => {
info!(
"Process {} started with PID: {}",
self.config.name,
child.id().unwrap_or(0)
);
self.child = Some(child);
self.set_state(ProcessState::Online);
self.error = None;
debug!(
"Process {} detached and running independently",
self.config.name
);
Ok(())
}
Err(e) => {
error!("Failed to start process {}: {}", self.config.name, e);
self.set_state(ProcessState::Errored);
self.error = Some(format!("Failed to start: {}", e));
Err(Error::ProcessStartFailed {
name: self.config.name.clone(),
reason: e.to_string(),
})
}
}
}
pub async fn stop(&mut self) -> Result<()> {
if !self.is_running() {
return Ok(());
}
info!("Stopping process: {}", self.config.name);
self.set_state(ProcessState::Stopping);
if let Some(mut child) = self.child.take() {
#[cfg(unix)]
{
use nix::sys::signal::{self, Signal};
use nix::unistd::Pid;
if let Some(pid) = child.id() {
let pid = Pid::from_raw(pid as i32);
if let Err(e) = signal::kill(pid, Signal::SIGTERM) {
warn!(
"Failed to send SIGTERM to process {}: {}",
self.config.name, e
);
} else {
debug!("Sent SIGTERM to process {}", self.config.name);
}
}
}
let timeout = tokio::time::Duration::from_secs(10);
match tokio::time::timeout(timeout, child.wait()).await {
Ok(Ok(exit_status)) => {
info!(
"Process {} stopped gracefully with exit code: {:?}",
self.config.name,
exit_status.code()
);
self.exit_code = exit_status.code();
}
Ok(Err(e)) => {
error!(
"Error waiting for process {} to stop: {}",
self.config.name, e
);
return Err(Error::ProcessStopFailed {
name: self.config.name.clone(),
reason: e.to_string(),
});
}
Err(_) => {
warn!(
"Process {} did not stop gracefully, killing forcefully",
self.config.name
);
if let Err(e) = child.kill().await {
error!("Failed to kill process {}: {}", self.config.name, e);
return Err(Error::ProcessStopFailed {
name: self.config.name.clone(),
reason: e.to_string(),
});
}
if let Ok(exit_status) = child.wait().await {
self.exit_code = exit_status.code();
}
}
}
}
self.set_state(ProcessState::Stopped);
self.started_at = None;
Ok(())
}
pub async fn restart(&mut self) -> Result<()> {
info!("Restarting process: {}", self.config.name);
self.set_state(ProcessState::Restarting);
if self.is_running() {
self.stop().await?;
}
self.restarts += 1;
self.start().await
}
pub async fn check_status(&mut self) -> Result<bool> {
if let Some(child) = &mut self.child {
match child.try_wait() {
Ok(Some(exit_status)) => {
info!(
"Process {} exited with status: {:?}",
self.config.name,
exit_status.code()
);
self.exit_code = exit_status.code();
self.set_state(ProcessState::Stopped);
self.child = None;
self.started_at = None;
Ok(false)
}
Ok(None) => {
Ok(true)
}
Err(e) => {
error!("Error checking process {} status: {}", self.config.name, e);
self.set_state(ProcessState::Errored);
self.error = Some(format!("Status check failed: {}", e));
self.child = None;
Ok(false)
}
}
} else {
Ok(false)
}
}
pub fn pid(&self) -> Option<u32> {
self.child.as_ref().and_then(|c| c.id())
}
pub fn set_assigned_port(&mut self, port: Option<u16>) {
self.assigned_port = port;
}
pub fn set_instance(&mut self, instance: Option<u32>) {
self.instance = instance;
}
pub fn set_stored_pid(&mut self, pid: Option<u32>) {
self.stored_pid = pid;
}
pub fn set_id(&mut self, id: ProcessId) {
self.id = id;
}
pub fn update_monitoring(&mut self, cpu_usage: f32, memory_usage: u64) {
self.monitoring.cpu_usage = cpu_usage;
self.monitoring.memory_usage = memory_usage;
self.monitoring.last_update = Some(Utc::now());
}
pub fn uptime_seconds(&self) -> Option<i64> {
self.started_at
.map(|start| (Utc::now() - start).num_seconds())
}
pub fn should_auto_restart(&self) -> bool {
self.config.autorestart
&& self.state == ProcessState::Stopped
&& self.config.max_restarts > self.restarts
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ProcessConfig;
use pretty_assertions::assert_eq;
fn create_test_config() -> ProcessConfig {
ProcessConfig::builder()
.name("test-process")
.script("echo")
.args(vec!["hello", "world"])
.build()
.unwrap()
}
#[test]
fn test_process_state_display() {
assert_eq!(ProcessState::Starting.to_string(), "starting");
assert_eq!(ProcessState::Online.to_string(), "online");
assert_eq!(ProcessState::Stopping.to_string(), "stopping");
assert_eq!(ProcessState::Stopped.to_string(), "stopped");
assert_eq!(ProcessState::Errored.to_string(), "errored");
assert_eq!(ProcessState::Restarting.to_string(), "restarting");
}
#[test]
fn test_process_state_serialization() {
let state = ProcessState::Online;
let serialized = serde_json::to_string(&state).unwrap();
assert_eq!(serialized, "\"online\"");
let deserialized: ProcessState = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, ProcessState::Online);
}
#[test]
fn test_process_monitoring_default() {
let monitoring = ProcessMonitoring::default();
assert_eq!(monitoring.cpu_usage, 0.0);
assert_eq!(monitoring.memory_usage, 0);
assert!(monitoring.last_update.is_none());
}
#[test]
fn test_process_new() {
let config = create_test_config();
let process = Process::new(config.clone());
assert_eq!(process.config.name, "test-process");
assert_eq!(process.config.script, "echo");
assert_eq!(process.config.args, vec!["hello", "world"]);
assert_eq!(process.state, ProcessState::Stopped);
assert!(process.child.is_none());
assert!(process.started_at.is_none());
assert_eq!(process.restarts, 0);
assert!(process.exit_code.is_none());
assert!(process.error.is_none());
assert!(process.instance.is_none());
assert!(process.assigned_port.is_none());
}
#[test]
fn test_process_status() {
let config = create_test_config();
let mut process = Process::new(config);
process.set_state(ProcessState::Online);
process.restarts = 5;
process.update_monitoring(25.5, 1024 * 1024);
process.set_assigned_port(Some(8080));
process.set_instance(Some(1));
let status = process.status();
assert_eq!(status.name, "test-process");
assert_eq!(status.state, ProcessState::Online);
assert_eq!(status.restarts, 5);
assert_eq!(status.cpu_usage, 25.5);
assert_eq!(status.memory_usage, 1024 * 1024);
assert_eq!(status.assigned_port, Some(8080));
assert_eq!(status.instance, Some(1));
assert_eq!(status.namespace, "default");
}
#[test]
fn test_process_is_running() {
let config = create_test_config();
let mut process = Process::new(config);
assert!(!process.is_running());
process.set_state(ProcessState::Starting);
assert!(process.is_running());
process.set_state(ProcessState::Online);
assert!(process.is_running());
process.set_state(ProcessState::Stopping);
assert!(!process.is_running());
process.set_state(ProcessState::Stopped);
assert!(!process.is_running());
process.set_state(ProcessState::Errored);
assert!(!process.is_running());
process.set_state(ProcessState::Restarting);
assert!(!process.is_running());
}
#[test]
fn test_process_set_state() {
let config = create_test_config();
let mut process = Process::new(config);
assert!(process.started_at.is_none());
process.set_state(ProcessState::Online);
assert_eq!(process.state, ProcessState::Online);
assert!(process.started_at.is_some());
let original_start = process.started_at;
process.set_state(ProcessState::Online);
assert_eq!(process.started_at, original_start);
process.set_state(ProcessState::Stopped);
assert_eq!(process.state, ProcessState::Stopped);
assert_eq!(process.started_at, original_start);
}
#[test]
fn test_process_set_assigned_port() {
let config = create_test_config();
let mut process = Process::new(config);
assert!(process.assigned_port.is_none());
process.set_assigned_port(Some(8080));
assert_eq!(process.assigned_port, Some(8080));
process.set_assigned_port(None);
assert!(process.assigned_port.is_none());
}
#[test]
fn test_process_set_instance() {
let config = create_test_config();
let mut process = Process::new(config);
assert!(process.instance.is_none());
process.set_instance(Some(1));
assert_eq!(process.instance, Some(1));
process.set_instance(None);
assert!(process.instance.is_none());
}
#[test]
fn test_process_update_monitoring() {
let config = create_test_config();
let mut process = Process::new(config);
assert_eq!(process.monitoring.cpu_usage, 0.0);
assert_eq!(process.monitoring.memory_usage, 0);
assert!(process.monitoring.last_update.is_none());
process.update_monitoring(50.5, 2048);
assert_eq!(process.monitoring.cpu_usage, 50.5);
assert_eq!(process.monitoring.memory_usage, 2048);
assert!(process.monitoring.last_update.is_some());
}
#[test]
fn test_process_uptime_seconds() {
let config = create_test_config();
let mut process = Process::new(config);
assert!(process.uptime_seconds().is_none());
process.set_state(ProcessState::Online);
let uptime = process.uptime_seconds();
assert!(uptime.is_some());
assert!(uptime.unwrap() >= 0);
}
#[test]
fn test_process_should_auto_restart() {
let mut config = create_test_config();
config.autorestart = true;
config.max_restarts = 5;
let mut process = Process::new(config);
process.set_state(ProcessState::Stopped);
assert!(process.should_auto_restart());
process.restarts = 5;
assert!(!process.should_auto_restart());
process.restarts = 0;
process.config.autorestart = false;
assert!(!process.should_auto_restart());
process.config.autorestart = true;
process.set_state(ProcessState::Online);
assert!(!process.should_auto_restart());
}
#[test]
fn test_process_pid() {
let config = create_test_config();
let process = Process::new(config);
assert!(process.pid().is_none());
}
#[tokio::test]
async fn test_process_start_already_running() {
let config = create_test_config();
let mut process = Process::new(config);
process.set_state(ProcessState::Online);
let result = process.start().await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Error::ProcessAlreadyRunning(_)
));
}
#[tokio::test]
async fn test_process_start_invalid_command() {
let config = ProcessConfig::builder()
.name("test-invalid")
.script("this-command-does-not-exist-12345")
.build()
.unwrap();
let mut process = Process::new(config);
let result = process.start().await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Error::ProcessStartFailed { .. }
));
assert_eq!(process.state, ProcessState::Errored);
assert!(process.error.is_some());
}
#[tokio::test]
async fn test_process_stop_not_running() {
let config = create_test_config();
let mut process = Process::new(config);
let result = process.stop().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_check_status_no_child() {
let config = create_test_config();
let mut process = Process::new(config);
let result = process.check_status().await;
assert!(result.is_ok());
assert!(!result.unwrap()); }
#[test]
fn test_process_status_serialization() {
let config = create_test_config();
let process = Process::new(config);
let status = process.status();
let serialized = serde_json::to_string(&status).unwrap();
let deserialized: ProcessStatus = serde_json::from_str(&serialized).unwrap();
assert_eq!(status.name, deserialized.name);
assert_eq!(status.state, deserialized.state);
assert_eq!(status.restarts, deserialized.restarts);
}
#[test]
fn test_restart_counter_increments() {
let config = create_test_config();
let mut process = Process::new(config);
assert_eq!(process.restarts, 0);
assert_eq!(process.status().restarts, 0);
process.restarts += 1;
assert_eq!(process.restarts, 1);
assert_eq!(process.status().restarts, 1);
process.restarts += 1;
assert_eq!(process.restarts, 2);
assert_eq!(process.status().restarts, 2);
process.restarts += 3;
assert_eq!(process.restarts, 5);
assert_eq!(process.status().restarts, 5);
}
#[test]
fn test_restart_counter_in_status_display() {
let config = create_test_config();
let mut process = Process::new(config);
for expected_count in 0..=10 {
process.restarts = expected_count;
let status = process.status();
assert_eq!(
status.restarts, expected_count,
"Restart count mismatch at iteration {}",
expected_count
);
}
}
#[test]
fn test_restart_counter_persistence() {
let config = create_test_config();
let mut process = Process::new(config);
process.restarts = 7;
process.set_state(ProcessState::Starting);
assert_eq!(process.restarts, 7);
process.set_state(ProcessState::Online);
assert_eq!(process.restarts, 7);
process.set_state(ProcessState::Stopping);
assert_eq!(process.restarts, 7);
process.set_state(ProcessState::Stopped);
assert_eq!(process.restarts, 7);
assert_eq!(process.status().restarts, 7);
}
}