use super::AgentOrchestrator;
use crate::services::agent_orchestration::{AgentStatus, OrchestrationError, OrchestrationResult};
impl AgentOrchestrator {
pub async fn delete_agent(&self, agent_name: &str) -> OrchestrationResult<()> {
tracing::debug!(agent_name = %agent_name, "Deleting agent");
if let Ok(AgentStatus::Running { .. }) = self.get_status(agent_name).await {
tracing::debug!(agent_name = %agent_name, "Stopping running agent before deletion");
self.lifecycle.disable_agent(agent_name).await?;
}
self.db_service.remove_agent_service(agent_name).await?;
tracing::debug!(agent_name = %agent_name, "Agent deleted successfully");
Ok(())
}
pub async fn delete_all_agents(&self) -> OrchestrationResult<u64> {
tracing::info!("Deleting all agents");
let agents = self.list_all().await?;
let total_count = agents.len() as u64;
if total_count == 0 {
tracing::debug!("No agents to delete");
return Ok(0);
}
tracing::info!(count = %total_count, "Found agents to delete");
tracing::debug!("Disabling all running agents");
self.disable_all().await?;
let mut deleted_count = 0;
for (agent_id, _) in agents {
match self.delete_agent(&agent_id).await {
Ok(()) => {
deleted_count += 1;
},
Err(e) => {
tracing::error!(agent_id = %agent_id, error = %e, "Failed to delete agent");
},
}
}
tracing::info!(deleted = %deleted_count, total = %total_count, "Deleted agents");
Ok(deleted_count)
}
#[cfg(unix)]
pub async fn cleanup_orphaned_processes(&self) -> OrchestrationResult<()> {
tracing::debug!("Scanning for orphaned agent processes");
let output = std::process::Command::new("pgrep")
.arg("-f")
.arg("agent-worker")
.output()
.map_err(|e| {
OrchestrationError::ProcessSpawnFailed(format!("Failed to run pgrep: {e}"))
})?;
if !output.status.success() {
tracing::debug!("No orphaned agent processes found");
return Ok(());
}
let pids_str = String::from_utf8_lossy(&output.stdout);
self.process_orphaned_pids(&pids_str).await
}
#[cfg(windows)]
pub async fn cleanup_orphaned_processes(&self) -> OrchestrationResult<()> {
tracing::debug!("Scanning for orphaned agent processes");
let output = std::process::Command::new("tasklist")
.args(["/FI", "IMAGENAME eq systemprompt*", "/FO", "CSV", "/NH"])
.output()
.map_err(|e| {
OrchestrationError::ProcessSpawnFailed(format!("Failed to run tasklist: {e}"))
})?;
let stdout = String::from_utf8_lossy(&output.stdout);
if stdout.contains("INFO: No tasks") || stdout.trim().is_empty() {
tracing::debug!("No orphaned agent processes found");
return Ok(());
}
let mut pids = Vec::new();
for line in stdout.lines() {
let parts: Vec<&str> = line.split(',').collect();
if parts.len() >= 2 {
if let Ok(pid) = parts[1].trim_matches('"').parse::<u32>() {
pids.push(pid.to_string());
}
}
}
let pids_str = pids.join("\n");
self.process_orphaned_pids(&pids_str).await
}
async fn process_orphaned_pids(&self, pids_str: &str) -> OrchestrationResult<()> {
let mut registered = 0;
let mut failed = 0;
for line in pids_str.lines() {
if let Ok(pid) = line.trim().parse::<u32>() {
if self.is_pid_tracked(pid).await? {
continue;
}
if let Some((agent_id, port)) = Self::identify_orphaned_process(pid) {
tracing::debug!(
pid = %pid,
agent_id = %agent_id,
port = %port,
"Found orphaned process"
);
let name = self
.db_service
.get_agent_config(&agent_id)
.await
.map_or_else(|_| "unknown".to_string(), |config| config.name);
match self.db_service.register_agent(&name, pid, port).await {
Ok(service_id) => {
tracing::info!(
service_id = %service_id,
pid = %pid,
"Registered orphaned process as service"
);
registered += 1;
},
Err(e) => {
tracing::error!(
error = %e,
pid = %pid,
"Failed to register orphaned process"
);
failed += 1;
},
}
} else {
tracing::warn!(pid = %pid, "Could not identify agent for orphaned process");
failed += 1;
}
}
}
if registered > 0 {
tracing::info!(registered = %registered, "Registered orphaned processes");
}
if failed > 0 {
tracing::warn!(failed = %failed, "Failed to handle some processes");
}
Ok(())
}
pub(super) async fn is_pid_tracked(&self, pid: u32) -> OrchestrationResult<bool> {
let agents = self.db_service.list_all_agents().await?;
for (_, status) in agents {
if let AgentStatus::Running {
pid: tracked_pid, ..
} = status
{
if tracked_pid == pid {
return Ok(true);
}
}
}
Ok(false)
}
#[cfg(target_os = "linux")]
pub(super) fn identify_orphaned_process(pid: u32) -> Option<(String, u16)> {
let environ_path = format!("/proc/{}/environ", pid);
if let Ok(environ_data) = std::fs::read(&environ_path) {
let environ_str = String::from_utf8_lossy(&environ_data);
let mut agent_id = None;
let mut port = None;
for env_var in environ_str.split('\0') {
if env_var.starts_with("AGENT_ID=") || env_var.starts_with("AGENT_UUID=") {
agent_id = env_var.split('=').nth(1).map(ToString::to_string);
} else if env_var.starts_with("AGENT_PORT=") {
if let Some(port_str) = env_var.split('=').nth(1) {
port = port_str.parse::<u16>().ok();
}
}
}
if let (Some(id), Some(p)) = (agent_id, port) {
return Some((id, p));
}
}
None
}
#[cfg(not(target_os = "linux"))]
pub(super) fn identify_orphaned_process(pid: u32) -> Option<(String, u16)> {
let output = std::process::Command::new("ps")
.args(["-p", &pid.to_string(), "-o", "args="])
.output()
.ok();
if let Some(output) = output {
let cmd = String::from_utf8_lossy(&output.stdout);
let mut agent_name = None;
let mut port = None;
let args: Vec<&str> = cmd.split_whitespace().collect();
let mut iter = args.iter().peekable();
while let Some(arg) = iter.next() {
match *arg {
"--agent-name" => {
agent_name = iter.next().map(|s| s.to_string());
},
"--port" => {
port = iter.next().and_then(|s| s.parse::<u16>().ok());
},
_ => {},
}
}
if let (Some(name), Some(p)) = (agent_name, port) {
return Some((name, p));
}
}
None
}
}