use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::process::Child;
use tokio::sync::Mutex as AsyncMutex;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ProcessType {
AgentRun {
agent_id: i64,
agent_name: String,
},
ClaudeSession {
session_id: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessInfo {
pub run_id: i64,
pub process_type: ProcessType,
pub pid: u32,
pub started_at: DateTime<Utc>,
pub project_path: String,
pub task: String,
pub model: String,
}
#[derive(Debug, Clone)]
pub struct ProcessRegistrationConfig {
pub run_id: i64,
pub agent_id: i64,
pub agent_name: String,
pub pid: u32,
pub project_path: String,
pub task: String,
pub model: String,
}
#[allow(dead_code)]
pub struct ProcessHandle {
pub info: ProcessInfo,
pub child: Arc<Mutex<Option<Child>>>,
pub live_output: Arc<Mutex<String>>,
}
pub struct ProcessRegistry {
processes: Arc<AsyncMutex<HashMap<i64, ProcessHandle>>>,
next_id: Arc<Mutex<i64>>,
}
impl ProcessRegistry {
pub fn new() -> Self {
Self {
processes: Arc::new(AsyncMutex::new(HashMap::new())),
next_id: Arc::new(Mutex::new(1000000)),
}
}
pub fn generate_id(&self) -> Result<i64, String> {
let mut next_id = self.next_id.lock().map_err(|e| e.to_string())?;
let id = *next_id;
*next_id += 1;
Ok(id)
}
pub async fn register_process(
&self,
config: ProcessRegistrationConfig,
child: Child,
) -> Result<(), String> {
let ProcessRegistrationConfig {
run_id,
agent_id,
agent_name,
pid,
project_path,
task,
model,
} = config;
let process_info = ProcessInfo {
run_id,
process_type: ProcessType::AgentRun {
agent_id,
agent_name,
},
pid,
started_at: Utc::now(),
project_path,
task,
model,
};
self.register_process_internal(run_id, process_info, child)
.await
}
pub async fn register_sidecar_process(
&self,
config: ProcessRegistrationConfig,
) -> Result<(), String> {
let ProcessRegistrationConfig {
run_id,
agent_id,
agent_name,
pid,
project_path,
task,
model,
} = config;
let process_info = ProcessInfo {
run_id,
process_type: ProcessType::AgentRun {
agent_id,
agent_name,
},
pid,
started_at: Utc::now(),
project_path,
task,
model,
};
let mut processes = self.processes.lock().await;
let process_handle = ProcessHandle {
info: process_info,
child: Arc::new(Mutex::new(None)),
live_output: Arc::new(Mutex::new(String::new())),
};
processes.insert(run_id, process_handle);
Ok(())
}
pub async fn register_claude_session(
&self,
session_id: String,
pid: u32,
project_path: String,
task: String,
model: String,
child: Arc<Mutex<Option<Child>>>,
) -> Result<i64, String> {
let run_id = self.generate_id()?;
let process_info = ProcessInfo {
run_id,
process_type: ProcessType::ClaudeSession { session_id },
pid,
started_at: Utc::now(),
project_path,
task,
model,
};
let mut processes = self.processes.lock().await;
let process_handle = ProcessHandle {
info: process_info,
child,
live_output: Arc::new(Mutex::new(String::new())),
};
processes.insert(run_id, process_handle);
Ok(run_id)
}
async fn register_process_internal(
&self,
run_id: i64,
process_info: ProcessInfo,
child: Child,
) -> Result<(), String> {
let mut processes = self.processes.lock().await;
let process_handle = ProcessHandle {
info: process_info,
child: Arc::new(Mutex::new(Some(child))),
live_output: Arc::new(Mutex::new(String::new())),
};
processes.insert(run_id, process_handle);
Ok(())
}
pub async fn get_running_claude_sessions(&self) -> Result<Vec<ProcessInfo>, String> {
let processes = self.processes.lock().await;
Ok(processes
.values()
.filter_map(|handle| match &handle.info.process_type {
ProcessType::ClaudeSession { .. } => Some(handle.info.clone()),
_ => None,
})
.collect())
}
pub async fn get_claude_session_by_id(
&self,
session_id: &str,
) -> Result<Option<ProcessInfo>, String> {
let processes = self.processes.lock().await;
Ok(processes
.values()
.find(|handle| match &handle.info.process_type {
ProcessType::ClaudeSession { session_id: sid } => sid == session_id,
_ => false,
})
.map(|handle| handle.info.clone()))
}
pub async fn unregister_process(&self, run_id: i64) -> Result<(), String> {
let mut processes = self.processes.lock().await;
processes.remove(&run_id);
Ok(())
}
#[allow(dead_code)]
fn unregister_process_sync(&self, run_id: i64) -> Result<(), String> {
if let Ok(mut processes) = self.processes.try_lock() {
processes.remove(&run_id);
}
Ok(())
}
#[allow(dead_code)]
pub async fn get_running_processes(&self) -> Result<Vec<ProcessInfo>, String> {
let processes = self.processes.lock().await;
Ok(processes
.values()
.map(|handle| handle.info.clone())
.collect())
}
pub async fn get_running_agent_processes(&self) -> Result<Vec<ProcessInfo>, String> {
let processes = self.processes.lock().await;
Ok(processes
.values()
.filter_map(|handle| match &handle.info.process_type {
ProcessType::AgentRun { .. } => Some(handle.info.clone()),
_ => None,
})
.collect())
}
#[allow(dead_code)]
pub async fn get_process(&self, run_id: i64) -> Result<Option<ProcessInfo>, String> {
let processes = self.processes.lock().await;
Ok(processes.get(&run_id).map(|handle| handle.info.clone()))
}
pub async fn kill_process(&self, run_id: i64) -> Result<bool, String> {
use tracing::{error, info, warn};
let (pid, child_arc) = {
let processes = self.processes.lock().await;
if let Some(handle) = processes.get(&run_id) {
(handle.info.pid, handle.child.clone())
} else {
warn!("Process {} not found in registry", run_id);
return Ok(false);
}
};
info!(
"Attempting graceful shutdown of process {} (PID: {})",
run_id, pid
);
let kill_sent = {
let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
if let Some(child) = child_guard.as_mut() {
match child.start_kill() {
Ok(_) => {
info!("Successfully sent kill signal to process {}", run_id);
true
}
Err(e) => {
error!("Failed to send kill signal to process {}: {}", run_id, e);
false
}
}
} else {
warn!(
"No child handle available for process {} (PID: {}), attempting system kill",
run_id, pid
);
false
}
};
if !kill_sent {
info!(
"Attempting fallback kill for process {} (PID: {})",
run_id, pid
);
match self.kill_process_by_pid(run_id, pid).await {
Ok(true) => return Ok(true),
Ok(false) => warn!(
"Fallback kill also failed for process {} (PID: {})",
run_id, pid
),
Err(e) => error!("Error during fallback kill: {}", e),
}
}
let wait_result = tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
loop {
let status = {
let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
if let Some(child) = child_guard.as_mut() {
match child.try_wait() {
Ok(Some(status)) => {
info!("Process {} exited with status: {:?}", run_id, status);
*child_guard = None;
Some(Ok::<(), String>(()))
}
Ok(None) => None,
Err(e) => {
error!("Error checking process status: {}", e);
Some(Err(e.to_string()))
}
}
} else {
Some(Ok(()))
}
};
match status {
Some(result) => return result,
None => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
}
})
.await;
match wait_result {
Ok(Ok(_)) => {
info!("Process {} exited gracefully", run_id);
}
Ok(Err(e)) => {
error!("Error waiting for process {}: {}", run_id, e);
}
Err(_) => {
warn!("Process {} didn't exit within 5 seconds after kill", run_id);
if let Ok(mut child_guard) = child_arc.lock() {
*child_guard = None;
}
let _ = self.kill_process_by_pid(run_id, pid).await;
}
}
self.unregister_process(run_id).await?;
Ok(true)
}
pub async fn kill_process_by_pid(&self, run_id: i64, pid: u32) -> Result<bool, String> {
use tracing::{error, info, warn};
info!("Attempting to kill process {} by PID {}", run_id, pid);
let kill_result = if cfg!(target_os = "windows") {
let pid_str = pid.to_string();
crate::process::process_utils::trace_windows_command(
"process_registry.kill_process_by_pid",
"taskkill",
["/F", "/PID", pid_str.as_str()],
);
let mut command = std::process::Command::new("taskkill");
crate::process::process_utils::hide_window_for_std_command(&mut command);
command.args(["/F", "/PID", &pid_str]).output()
} else {
let term_result = std::process::Command::new("kill")
.args(["-TERM", &pid.to_string()])
.output();
match &term_result {
Ok(output) if output.status.success() => {
info!("Sent SIGTERM to PID {}", pid);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let check_result = std::process::Command::new("kill")
.args(["-0", &pid.to_string()])
.output();
if let Ok(output) = check_result {
if output.status.success() {
warn!(
"Process {} still running after SIGTERM, sending SIGKILL",
pid
);
std::process::Command::new("kill")
.args(["-KILL", &pid.to_string()])
.output()
} else {
term_result
}
} else {
term_result
}
}
_ => {
warn!("SIGTERM failed for PID {}, trying SIGKILL", pid);
std::process::Command::new("kill")
.args(["-KILL", &pid.to_string()])
.output()
}
}
};
match kill_result {
Ok(output) => {
if output.status.success() {
info!("Successfully killed process with PID {}", pid);
self.unregister_process(run_id).await?;
Ok(true)
} else {
let error_msg = String::from_utf8_lossy(&output.stderr);
warn!("Failed to kill PID {}: {}", pid, error_msg);
Ok(false)
}
}
Err(e) => {
error!("Failed to execute kill command for PID {}: {}", pid, e);
Err(format!("Failed to execute kill command: {}", e))
}
}
}
#[allow(dead_code)]
pub async fn is_process_running(&self, run_id: i64) -> Result<bool, String> {
let processes = self.processes.lock().await;
if let Some(handle) = processes.get(&run_id) {
let child_arc = handle.child.clone();
drop(processes);
let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
if let Some(ref mut child) = child_guard.as_mut() {
match child.try_wait() {
Ok(Some(_)) => {
*child_guard = None;
Ok(false)
}
Ok(None) => Ok(true),
Err(_) => {
*child_guard = None;
Ok(false)
}
}
} else {
Ok(false)
}
} else {
Ok(false)
}
}
pub async fn append_live_output(&self, run_id: i64, output: &str) -> Result<(), String> {
let processes = self.processes.lock().await;
if let Some(handle) = processes.get(&run_id) {
let mut live_output = handle.live_output.lock().map_err(|e| e.to_string())?;
live_output.push_str(output);
live_output.push('\n');
}
Ok(())
}
pub async fn get_live_output(&self, run_id: i64) -> Result<String, String> {
let processes = self.processes.lock().await;
if let Some(handle) = processes.get(&run_id) {
let live_output = handle.live_output.lock().map_err(|e| e.to_string())?;
Ok(live_output.clone())
} else {
Ok(String::new())
}
}
#[allow(dead_code)]
pub async fn cleanup_finished_processes(&self) -> Result<Vec<i64>, String> {
let mut finished_runs = Vec::new();
{
let processes = self.processes.lock().await;
let run_ids: Vec<i64> = processes.keys().cloned().collect();
drop(processes);
for run_id in run_ids {
if !self.is_process_running(run_id).await? {
finished_runs.push(run_id);
}
}
}
{
let mut processes = self.processes.lock().await;
for run_id in &finished_runs {
processes.remove(run_id);
}
}
Ok(finished_runs)
}
}
impl Default for ProcessRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_append_and_get_live_output() {
let registry = ProcessRegistry::new();
let run_id = registry
.register_claude_session(
"session-1".to_string(),
1234,
"/tmp/project".to_string(),
"task".to_string(),
"model".to_string(),
Arc::new(Mutex::new(None)),
)
.await
.unwrap();
registry.append_live_output(run_id, "line1").await.unwrap();
registry.append_live_output(run_id, "line2").await.unwrap();
let output = registry.get_live_output(run_id).await.unwrap();
assert_eq!(output, "line1\nline2\n");
}
}