use crate::commands::run::parallel::state;
use crate::commands::run::parallel::worker::{FinishedWorker, WorkerState, terminate_workers};
use crate::commands::run::parallel::workspace_cleanup::remove_workspace_best_effort;
use crate::git::WorkspaceSpec;
use anyhow::Result;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::mpsc::{self, Receiver, Sender};
use std::time::Duration;
pub(crate) struct ParallelCleanupGuard {
state_path: PathBuf,
state_file: state::ParallelStateFile,
in_flight: HashMap<String, WorkerState>,
workspaces: HashMap<String, WorkspaceSpec>,
workspace_root: PathBuf,
worker_events_tx: Sender<FinishedWorker>,
worker_events_rx: Receiver<FinishedWorker>,
completed: bool,
}
impl ParallelCleanupGuard {
pub fn new_simple(
state_path: PathBuf,
state_file: state::ParallelStateFile,
workspace_root: PathBuf,
) -> Self {
let (worker_events_tx, worker_events_rx) = mpsc::channel();
Self {
state_path,
state_file,
in_flight: HashMap::new(),
workspaces: HashMap::new(),
workspace_root,
worker_events_tx,
worker_events_rx,
completed: false,
}
}
pub fn mark_completed(&mut self) {
self.completed = true;
}
pub fn state_file_mut(&mut self) -> &mut state::ParallelStateFile {
&mut self.state_file
}
pub fn state_file(&self) -> &state::ParallelStateFile {
&self.state_file
}
pub fn in_flight(&self) -> &HashMap<String, WorkerState> {
&self.in_flight
}
pub fn worker_event_sender(&self) -> Sender<FinishedWorker> {
self.worker_events_tx.clone()
}
pub fn drain_finished_workers(&mut self) -> Vec<FinishedWorker> {
let mut finished = Vec::new();
while let Ok(event) = self.worker_events_rx.try_recv() {
finished.push(event);
}
finished
}
pub fn wait_for_finished_workers(&mut self, timeout: Duration) -> Vec<FinishedWorker> {
match self.worker_events_rx.recv_timeout(timeout) {
Ok(first) => {
let mut finished = Vec::new();
finished.push(first);
finished.extend(self.drain_finished_workers());
finished
}
Err(mpsc::RecvTimeoutError::Timeout) => Vec::new(),
Err(mpsc::RecvTimeoutError::Disconnected) => Vec::new(),
}
}
pub fn register_workspace(&mut self, task_id: String, spec: WorkspaceSpec) {
self.workspaces.insert(task_id, spec);
}
pub fn register_worker(&mut self, task_id: String, worker: WorkerState) {
self.in_flight.insert(task_id, worker);
}
pub fn remove_worker(&mut self, task_id: &str) -> Option<WorkerState> {
self.in_flight.remove(task_id)
}
pub fn cleanup(&mut self) -> Result<()> {
if self.completed {
return Ok(());
}
log::debug!("ParallelCleanupGuard: performing cleanup");
terminate_workers(&mut self.in_flight);
let blocked_task_ids: HashSet<String> = self
.state_file
.workers
.iter()
.filter(|worker| matches!(worker.lifecycle, state::WorkerLifecycle::BlockedPush))
.map(|worker| worker.task_id.trim().to_string())
.collect();
for (task_id, spec) in &self.workspaces {
if blocked_task_ids.contains(task_id.trim()) {
continue;
}
if spec.path.exists() {
remove_workspace_best_effort(&self.workspace_root, spec, "cleanup guard");
}
}
self.state_file
.workers
.retain(state::WorkerRecord::is_terminal);
if let Err(err) = state::save_state(&self.state_path, &self.state_file) {
log::warn!("Failed to save parallel state during cleanup: {:#}", err);
}
self.completed = true;
Ok(())
}
fn cleanup_best_effort(&mut self) {
if let Err(err) = self.cleanup() {
log::warn!("ParallelCleanupGuard: cleanup error: {:#}", err);
}
}
}
impl Drop for ParallelCleanupGuard {
fn drop(&mut self) {
self.cleanup_best_effort();
}
}
#[cfg(test)]
mod tests;