use crate::commands::run::parallel::state;
use crate::commands::run::parallel::worker::{FinishedWorker, WorkerState, terminate_workers};
use crate::commands::run::parallel::workspace_cleanup::remove_workspace_best_effort;
use crate::git::WorkspaceSpec;
use anyhow::Result;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::mpsc::{self, Receiver, Sender};
use std::time::Duration;
pub(crate) struct ParallelCleanupGuard {
state_path: PathBuf,
state_file: state::ParallelStateFile,
in_flight: HashMap<String, WorkerState>,
workspaces: HashMap<String, WorkspaceSpec>,
workspace_root: PathBuf,
worker_events_tx: Sender<FinishedWorker>,
worker_events_rx: Receiver<FinishedWorker>,
completed: bool,
}
impl ParallelCleanupGuard {
pub fn new_simple(
state_path: PathBuf,
state_file: state::ParallelStateFile,
workspace_root: PathBuf,
) -> Self {
let (worker_events_tx, worker_events_rx) = mpsc::channel();
Self {
state_path,
state_file,
in_flight: HashMap::new(),
workspaces: HashMap::new(),
workspace_root,
worker_events_tx,
worker_events_rx,
completed: false,
}
}
pub fn mark_completed(&mut self) {
self.completed = true;
}
pub fn state_file_mut(&mut self) -> &mut state::ParallelStateFile {
&mut self.state_file
}
pub fn state_file(&self) -> &state::ParallelStateFile {
&self.state_file
}
pub fn in_flight(&self) -> &HashMap<String, WorkerState> {
&self.in_flight
}
pub fn worker_event_sender(&self) -> Sender<FinishedWorker> {
self.worker_events_tx.clone()
}
pub fn drain_finished_workers(&mut self) -> Vec<FinishedWorker> {
let mut finished = Vec::new();
while let Ok(event) = self.worker_events_rx.try_recv() {
finished.push(event);
}
finished
}
pub fn wait_for_finished_workers(&mut self, timeout: Duration) -> Vec<FinishedWorker> {
match self.worker_events_rx.recv_timeout(timeout) {
Ok(first) => {
let mut finished = Vec::new();
finished.push(first);
finished.extend(self.drain_finished_workers());
finished
}
Err(mpsc::RecvTimeoutError::Timeout) => Vec::new(),
Err(mpsc::RecvTimeoutError::Disconnected) => Vec::new(),
}
}
pub fn register_workspace(&mut self, task_id: String, spec: WorkspaceSpec) {
self.workspaces.insert(task_id, spec);
}
pub fn register_worker(&mut self, task_id: String, worker: WorkerState) {
self.in_flight.insert(task_id, worker);
}
pub fn remove_worker(&mut self, task_id: &str) -> Option<WorkerState> {
self.in_flight.remove(task_id)
}
pub fn cleanup(&mut self) -> Result<()> {
if self.completed {
return Ok(());
}
log::debug!("ParallelCleanupGuard: performing cleanup");
terminate_workers(&mut self.in_flight);
let blocked_task_ids: HashSet<String> = self
.state_file
.workers
.iter()
.filter(|worker| matches!(worker.lifecycle, state::WorkerLifecycle::BlockedPush))
.map(|worker| worker.task_id.trim().to_string())
.collect();
for (task_id, spec) in &self.workspaces {
if blocked_task_ids.contains(task_id.trim()) {
continue;
}
if spec.path.exists() {
remove_workspace_best_effort(&self.workspace_root, spec, "cleanup guard");
}
}
self.state_file
.workers
.retain(state::WorkerRecord::is_terminal);
if let Err(err) = state::save_state(&self.state_path, &self.state_file) {
log::warn!("Failed to save parallel state during cleanup: {:#}", err);
}
self.completed = true;
Ok(())
}
fn cleanup_best_effort(&mut self) {
if let Err(err) = self.cleanup() {
log::warn!("ParallelCleanupGuard: cleanup error: {:#}", err);
}
}
}
impl Drop for ParallelCleanupGuard {
fn drop(&mut self) {
self.cleanup_best_effort();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::commands::run::parallel::worker::start_worker_monitor;
use crate::lock;
use std::process::{Child, Command};
use tempfile::TempDir;
fn create_test_guard(temp: &TempDir) -> ParallelCleanupGuard {
let workspace_root = temp.path().join("workspaces");
std::fs::create_dir_all(&workspace_root).unwrap();
let state_path = temp.path().join("state.json");
let state_file =
state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
ParallelCleanupGuard::new_simple(state_path, state_file, workspace_root)
}
fn register_sleeping_worker(
guard: &mut ParallelCleanupGuard,
temp: &TempDir,
task_id: &str,
) -> Result<u32> {
let child: Child = Command::new("sleep").arg("10").spawn()?;
let pid = child.id();
let workspace_path = temp.path().join("workspaces").join(task_id);
std::fs::create_dir_all(&workspace_path)?;
let worker = start_worker_monitor(
task_id,
"Test task".to_string(),
WorkspaceSpec {
path: workspace_path,
branch: "main".to_string(),
},
child,
guard.worker_event_sender(),
);
guard.register_worker(task_id.to_string(), worker);
Ok(pid)
}
#[cfg(unix)]
fn kill_test_process(pid: u32) {
unsafe {
let _ = libc::kill(pid as i32, libc::SIGKILL);
}
}
#[cfg(windows)]
fn kill_test_process(pid: u32) {
let _ = Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/T", "/F"])
.status();
}
#[cfg(all(not(unix), not(windows)))]
fn kill_test_process(_pid: u32) {}
#[test]
fn guard_cleanup_kills_worker_and_clears_state() -> Result<()> {
let temp = TempDir::new()?;
let mut guard = create_test_guard(&temp);
let pid = register_sleeping_worker(&mut guard, &temp, "RQ-0001")?;
let workspace_path = temp.path().join("workspaces").join("RQ-0001");
guard
.state_file_mut()
.upsert_worker(state::WorkerRecord::new(
"RQ-0001",
workspace_path.clone(),
"2026-02-20T00:00:00Z".to_string(),
));
assert_eq!(
lock::pid_is_running(pid),
Some(true),
"Worker should be running before cleanup"
);
guard.cleanup()?;
let running = lock::pid_is_running(pid);
assert!(
running == Some(false) || running.is_none(),
"Worker should be terminated after cleanup, got: {:?}",
running
);
assert!(
guard.state_file.workers.is_empty(),
"workers should be empty after cleanup"
);
Ok(())
}
#[test]
fn guard_disarm_prevents_cleanup() -> Result<()> {
let temp = TempDir::new()?;
let mut guard = create_test_guard(&temp);
let pid = register_sleeping_worker(&mut guard, &temp, "RQ-0001")?;
guard.mark_completed();
drop(guard);
assert_eq!(
lock::pid_is_running(pid),
Some(true),
"Worker should still be running after disarmed drop"
);
kill_test_process(pid);
Ok(())
}
#[test]
fn guard_cleanup_is_idempotent() -> Result<()> {
let temp = TempDir::new()?;
let mut guard = create_test_guard(&temp);
let pid = register_sleeping_worker(&mut guard, &temp, "RQ-0001")?;
guard.cleanup()?;
let running = lock::pid_is_running(pid);
assert!(
running == Some(false) || running.is_none(),
"Worker should be terminated after first cleanup, got: {:?}",
running
);
guard.cleanup()?;
Ok(())
}
#[test]
fn guard_cleanup_runs_on_drop() -> Result<()> {
let temp = TempDir::new()?;
let mut guard = create_test_guard(&temp);
let pid = register_sleeping_worker(&mut guard, &temp, "RQ-0001")?;
assert_eq!(
lock::pid_is_running(pid),
Some(true),
"Worker should be running before drop"
);
drop(guard);
let running = lock::pid_is_running(pid);
assert!(
running == Some(false) || running.is_none(),
"Worker should be terminated after guard drop, got: {:?}",
running
);
Ok(())
}
#[test]
fn guard_cleanup_retains_terminal_workers_for_status_retry() -> Result<()> {
let temp = TempDir::new()?;
let mut guard = create_test_guard(&temp);
let running_workspace = temp.path().join("workspaces").join("RQ-0001");
let completed_workspace = temp.path().join("workspaces").join("RQ-0002");
std::fs::create_dir_all(&running_workspace)?;
std::fs::create_dir_all(&completed_workspace)?;
guard
.state_file_mut()
.upsert_worker(state::WorkerRecord::new(
"RQ-0001",
running_workspace,
"2026-02-20T00:00:00Z".to_string(),
));
let mut completed = state::WorkerRecord::new(
"RQ-0002",
completed_workspace,
"2026-02-20T00:00:00Z".to_string(),
);
completed.mark_completed("2026-02-20T00:01:00Z".to_string());
guard.state_file_mut().upsert_worker(completed);
guard.cleanup()?;
assert_eq!(guard.state_file.workers.len(), 1);
assert_eq!(guard.state_file.workers[0].task_id, "RQ-0002");
assert!(guard.state_file.workers[0].is_terminal());
Ok(())
}
#[test]
fn guard_cleanup_retains_blocked_workspace_for_retry() -> Result<()> {
let temp = TempDir::new()?;
let mut guard = create_test_guard(&temp);
let blocked_workspace = temp.path().join("workspaces").join("RQ-0099");
std::fs::create_dir_all(&blocked_workspace)?;
guard.register_workspace(
"RQ-0099".to_string(),
WorkspaceSpec {
path: blocked_workspace.clone(),
branch: "main".to_string(),
},
);
let mut blocked = state::WorkerRecord::new(
"RQ-0099",
blocked_workspace.clone(),
"2026-02-20T00:00:00Z".to_string(),
);
blocked.mark_blocked("2026-02-20T00:05:00Z".to_string(), "blocked");
guard.state_file_mut().upsert_worker(blocked);
guard.cleanup()?;
assert!(blocked_workspace.exists());
assert_eq!(guard.state_file.workers.len(), 1);
assert!(matches!(
guard.state_file.workers[0].lifecycle,
state::WorkerLifecycle::BlockedPush
));
Ok(())
}
}