use crate::io::transport::{StdioTransport, Transport};
use async_trait::async_trait;
use std::io;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::task::JoinHandle;
use tracing::{error, info, trace};
#[cfg(windows)]
use tracing::warn;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StopMode {
Graceful,
#[allow(dead_code)]
Force,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProcessState {
NotStarted,
Running { pid: u32 },
Stopped,
}
impl ProcessState {
pub fn pid(&self) -> Option<u32> {
match self {
ProcessState::Running { pid } => Some(*pid),
_ => None,
}
}
pub fn is_running(&self) -> bool {
matches!(self, ProcessState::Running { .. })
}
}
#[derive(Debug, Clone)]
pub struct ProcessExitEvent {}
#[async_trait]
pub trait ProcessExitHandler: Send + Sync {
async fn on_process_exit(&self, event: ProcessExitEvent);
}
pub trait StderrMonitor: Send + Sync {
fn on_stderr_line<F>(&mut self, handler: F)
where
F: Fn(String) + Send + Sync + 'static;
}
#[derive(Debug, thiserror::Error)]
pub enum ProcessError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Process not started")]
NotStarted,
#[error("Process already started")]
AlreadyStarted,
#[error("Stdin not available")]
StdinNotAvailable,
#[error("Stdout not available")]
StdoutNotAvailable,
#[error("Stderr not available")]
StderrNotAvailable,
}
#[async_trait]
pub trait ProcessManager: Send + Sync {
type Error: std::error::Error + Send + Sync + 'static;
async fn start(&mut self) -> Result<(), Self::Error>;
async fn stop(&mut self, mode: StopMode) -> Result<(), Self::Error>;
fn is_running(&self) -> bool;
fn create_stdio_transport(&mut self) -> Result<StdioTransport, Self::Error>;
fn kill_sync(&mut self);
}
pub struct ChildProcessManager {
command: String,
args: Vec<String>,
working_directory: Option<PathBuf>,
state: Arc<Mutex<ProcessState>>,
stdio_transport: Option<StdioTransport>,
stderr_handler: Option<Box<dyn Fn(String) + Send + Sync>>,
stderr_task: Option<JoinHandle<()>>,
wait_task: Option<JoinHandle<()>>,
exit_handler: Option<Arc<dyn ProcessExitHandler>>,
}
impl ChildProcessManager {
pub fn new(command: String, args: Vec<String>, working_dir: Option<PathBuf>) -> Self {
Self {
command,
args,
working_directory: working_dir,
state: Arc::new(Mutex::new(ProcessState::NotStarted)),
stdio_transport: None,
stderr_handler: None,
stderr_task: None,
wait_task: None,
exit_handler: None,
}
}
pub fn get_state(&self) -> ProcessState {
self.state.lock().unwrap().clone()
}
async fn spawn_stderr_monitor_with_pipe(
&mut self,
stderr: tokio::process::ChildStderr,
) -> Result<(), ProcessError> {
if self.stderr_task.is_some() {
return Ok(());
}
let handler = self.stderr_handler.take();
let task = tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut line = String::new();
trace!(
"ChildProcessManager: Starting stderr monitoring (handler: {})",
if handler.is_some() {
"installed"
} else {
"draining only"
}
);
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
trace!("ChildProcessManager: stderr EOF reached");
break;
}
Ok(_) => {
let line_content = line.trim().to_string();
if !line_content.is_empty() {
if let Some(ref handler) = handler {
trace!("ChildProcessManager: stderr line: {}", line_content);
handler(line_content);
} else {
trace!("ChildProcessManager: stderr drained: {}", line_content);
}
}
}
Err(e) => {
error!("Failed to read from stderr: {}", e);
break;
}
}
}
trace!("ChildProcessManager: stderr monitoring finished");
});
self.stderr_task = Some(task);
Ok(())
}
async fn spawn_wait_task(&mut self, mut child: Child) -> Result<(), ProcessError> {
let current_pid = self.get_state().pid();
let exit_handler = self.exit_handler.clone();
let state = Arc::clone(&self.state);
let task = tokio::spawn(async move {
trace!(
"ChildProcessManager: Starting wait task for PID {:?}",
current_pid
);
match child.wait().await {
Ok(exit_status) => {
info!(
"Process PID {:?} exited with status: {}",
current_pid, exit_status
);
if let Ok(mut process_state) = state.lock() {
*process_state = ProcessState::Stopped;
}
if let Some(handler) = &exit_handler {
let event = ProcessExitEvent {};
handler.on_process_exit(event).await;
}
}
Err(e) => {
error!("Error waiting for child process: {}", e);
if let Ok(mut process_state) = state.lock() {
*process_state = ProcessState::Stopped;
}
if let Some(handler) = &exit_handler {
let event = ProcessExitEvent {};
handler.on_process_exit(event).await;
}
}
}
trace!(
"ChildProcessManager: Wait task finished for PID {:?}",
current_pid
);
});
self.wait_task = Some(task);
Ok(())
}
}
#[async_trait]
impl ProcessManager for ChildProcessManager {
type Error = ProcessError;
async fn start(&mut self) -> Result<(), Self::Error> {
if self.is_running() {
return Err(ProcessError::AlreadyStarted);
}
info!("Starting process: {} {:?}", self.command, self.args);
let mut command_builder = Command::new(&self.command);
command_builder
.args(&self.args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(working_dir) = &self.working_directory {
command_builder.current_dir(working_dir);
}
let mut child = command_builder.spawn()?;
let pid = child.id();
info!("Process started with PID: {:?}", pid);
if let Some(pid) = pid {
*self.state.lock().unwrap() = ProcessState::Running { pid };
} else {
return Err(ProcessError::Io(std::io::Error::other(
"Failed to get process ID",
)));
}
let stdin = child.stdin.take().ok_or(ProcessError::StdinNotAvailable)?;
let stdout = child
.stdout
.take()
.ok_or(ProcessError::StdoutNotAvailable)?;
let stderr = child
.stderr
.take()
.ok_or(ProcessError::StderrNotAvailable)?;
self.stdio_transport = Some(StdioTransport::new(stdin, stdout));
self.spawn_stderr_monitor_with_pipe(stderr).await?;
self.spawn_wait_task(child).await?;
Ok(())
}
async fn stop(&mut self, mode: StopMode) -> Result<(), Self::Error> {
let pid = match self.get_state().pid() {
Some(pid) => pid,
None => return Err(ProcessError::NotStarted),
};
match mode {
StopMode::Graceful => info!("Gracefully stopping process with PID: {}", pid),
StopMode::Force => info!("Force killing process with PID: {}", pid),
}
if let Some(mut transport) = self.stdio_transport.take() {
let _ = transport.close().await; }
#[cfg(unix)]
{
unsafe {
match mode {
StopMode::Graceful => {
if libc::kill(pid as libc::pid_t, libc::SIGTERM) == 0 {
info!("Sent SIGTERM to process {}", pid);
}
}
StopMode::Force => {
libc::kill(pid as libc::pid_t, libc::SIGKILL);
info!("Sent SIGKILL to process {}", pid);
}
}
}
}
#[cfg(not(unix))]
{
warn!("Windows process termination not fully implemented");
}
if let Some(task) = self.stderr_task.take() {
task.abort();
}
*self.state.lock().unwrap() = ProcessState::Stopped;
Ok(())
}
fn is_running(&self) -> bool {
self.get_state().is_running()
}
fn create_stdio_transport(&mut self) -> Result<StdioTransport, Self::Error> {
self.stdio_transport.take().ok_or(ProcessError::NotStarted)
}
fn kill_sync(&mut self) {
let pid = match self.get_state().pid() {
Some(pid) => pid,
None => return, };
info!("Synchronously force killing process with PID: {}", pid);
#[cfg(unix)]
{
unsafe {
libc::kill(pid as libc::pid_t, libc::SIGKILL);
info!("Sent SIGKILL to process {}", pid);
}
}
#[cfg(not(unix))]
{
warn!("Windows sync process kill not implemented - process may remain");
}
if let Some(task) = self.stderr_task.take() {
task.abort();
}
*self.state.lock().unwrap() = ProcessState::Stopped;
}
}
impl StderrMonitor for ChildProcessManager {
fn on_stderr_line<F>(&mut self, handler: F)
where
F: Fn(String) + Send + Sync + 'static,
{
self.stderr_handler = Some(Box::new(handler));
}
}
#[cfg(test)]
#[allow(dead_code)]
pub struct MockProcessManager {
running: bool,
process_id: Option<u32>,
}
#[cfg(test)]
#[allow(dead_code)]
impl MockProcessManager {
pub fn new() -> Self {
Self {
running: false,
process_id: None,
}
}
}
#[cfg(test)]
impl Default for MockProcessManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[async_trait]
impl ProcessManager for MockProcessManager {
type Error = ProcessError;
async fn start(&mut self) -> Result<(), Self::Error> {
if self.running {
return Err(ProcessError::AlreadyStarted);
}
self.running = true;
self.process_id = Some(12345); Ok(())
}
async fn stop(&mut self, _mode: StopMode) -> Result<(), Self::Error> {
self.running = false;
self.process_id = None;
Ok(())
}
fn is_running(&self) -> bool {
self.running
}
fn create_stdio_transport(&mut self) -> Result<StdioTransport, Self::Error> {
Err(ProcessError::NotStarted)
}
fn kill_sync(&mut self) {
self.running = false;
self.process_id = None;
}
}
#[cfg(test)]
impl StderrMonitor for MockProcessManager {
fn on_stderr_line<F>(&mut self, _callback: F)
where
F: Fn(String) + Send + Sync + 'static,
{
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
#[tokio::test]
async fn test_child_process_manager_lifecycle() {
let mut manager =
ChildProcessManager::new("echo".to_string(), vec!["hello".to_string()], None);
assert!(!manager.is_running());
manager.start().await.unwrap();
assert!(manager.is_running());
manager.stop(StopMode::Graceful).await.unwrap();
assert!(!manager.is_running());
}
#[tokio::test]
async fn test_stderr_monitoring() {
let mut manager = ChildProcessManager::new(
"sh".to_string(),
vec![
"-c".to_string(),
"echo 'error message' >&2; sleep 1".to_string(),
],
None,
);
let stderr_lines = Arc::new(Mutex::new(Vec::<String>::new()));
let stderr_lines_clone = Arc::clone(&stderr_lines);
manager.on_stderr_line(move |line| {
if let Ok(mut lines) = stderr_lines_clone.lock() {
lines.push(line);
}
});
manager.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
manager.stop(StopMode::Graceful).await.unwrap();
let lines = stderr_lines.lock().unwrap();
assert!(!lines.is_empty());
assert_eq!(lines[0], "error message");
}
#[tokio::test]
async fn test_process_state_transitions() {
let mut manager =
ChildProcessManager::new("echo".to_string(), vec!["hello".to_string()], None);
assert_eq!(manager.get_state(), ProcessState::NotStarted);
assert!(!manager.is_running());
manager.start().await.unwrap();
let running_state = manager.get_state();
assert!(matches!(running_state, ProcessState::Running { .. }));
assert!(manager.is_running());
manager.stop(StopMode::Graceful).await.unwrap();
assert_eq!(manager.get_state(), ProcessState::Stopped);
assert!(!manager.is_running());
}
#[tokio::test]
async fn test_invalid_operations() {
let mut manager =
ChildProcessManager::new("echo".to_string(), vec!["hello".to_string()], None);
let result = manager.stop(StopMode::Graceful).await;
assert!(matches!(result, Err(ProcessError::NotStarted)));
manager.start().await.unwrap();
let result = manager.start().await;
assert!(matches!(result, Err(ProcessError::AlreadyStarted)));
manager.stop(StopMode::Graceful).await.unwrap();
let result = manager.stop(StopMode::Graceful).await;
assert!(matches!(result, Err(ProcessError::NotStarted)));
}
#[tokio::test]
async fn test_create_transport_simple() {
let mut manager =
ChildProcessManager::new("echo".to_string(), vec!["hello".to_string()], None);
let result = manager.create_stdio_transport();
assert!(matches!(result, Err(ProcessError::NotStarted)));
manager.start().await.unwrap();
let _transport = manager.create_stdio_transport().unwrap();
let result = manager.create_stdio_transport();
assert!(matches!(result, Err(ProcessError::NotStarted)));
}
#[test]
fn test_process_state_methods() {
let not_started = ProcessState::NotStarted;
assert!(!not_started.is_running());
assert!(not_started.pid().is_none());
let running = ProcessState::Running { pid: 12345 };
assert!(running.is_running());
assert_eq!(running.pid(), Some(12345));
let stopped = ProcessState::Stopped;
assert!(!stopped.is_running());
assert!(stopped.pid().is_none());
}
}