use super::*;
use std::{
fs,
path::{Path, PathBuf},
sync::{
Arc, Condvar, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc,
},
thread,
time::{Duration, Instant},
};
const PROGRESS_SEND_INTERVAL: Duration = Duration::from_millis(80);
pub(in crate::app::jobs) struct TrashPool {
shared: Arc<TrashShared>,
workers: Vec<thread::JoinHandle<()>>,
}
struct TrashShared {
state: Mutex<TrashState>,
available: Condvar,
cancelled: AtomicBool,
cancel_token: AtomicU64,
}
struct TrashState {
pending: Option<TrashRequest>,
active: bool,
closed: bool,
}
impl TrashPool {
pub(in crate::app::jobs) fn new(result_tx: mpsc::Sender<JobResult>) -> Self {
let shared = Arc::new(TrashShared {
state: Mutex::new(TrashState {
pending: None,
active: false,
closed: false,
}),
available: Condvar::new(),
cancelled: AtomicBool::new(false),
cancel_token: AtomicU64::new(0),
});
let shared_worker = Arc::clone(&shared);
let worker = thread::spawn(move || {
while let Some(request) = TrashShared::pop(&shared_worker) {
TrashShared::set_active(&shared_worker, true);
let (completed, errors, stopped_early) = run_trash(
&request,
&result_tx,
&shared_worker.cancelled,
&shared_worker.cancel_token,
);
TrashShared::set_active(&shared_worker, false);
let verb = if request.permanent {
"Permanently deleted"
} else {
"Trashed"
};
let total = request.targets.len();
let single_name = (total == 1).then(|| request.targets[0].name.as_str());
let status = if stopped_early && errors.is_empty() {
match completed {
0 => "Trash cancelled".to_string(),
1 => format!("Trash cancelled — {verb} 1 item"),
n => format!("Trash cancelled — {verb} {n} items"),
}
} else if stopped_early {
let base = match completed {
0 => "Trash cancelled".to_string(),
1 => format!("Trash cancelled — {verb} 1 item"),
n => format!("Trash cancelled — {verb} {n} items"),
};
format!("{base}; {} error(s) — first: {}", errors.len(), errors[0])
} else if errors.is_empty() {
match (completed, single_name) {
(0, _) => "Nothing was deleted".to_string(),
(1, Some(name)) => format!("{verb} \"{name}\""),
(n, _) => format!("{verb} {n} items"),
}
} else if completed == 0 {
if errors.len() == 1 {
errors[0].clone()
} else {
format!("{} errors — first: {}", errors.len(), errors[0])
}
} else {
format!(
"{verb} {completed} item(s); {} error(s) — first: {}",
errors.len(),
errors[0]
)
};
if result_tx
.send(JobResult::Trash(TrashBuild {
token: request.token,
completed,
done: true,
status: Some(status),
}))
.is_err()
{
break;
}
}
});
Self {
shared,
workers: vec![worker],
}
}
pub(in crate::app::jobs) fn submit(&self, request: TrashRequest) -> bool {
let mut state = lock_unpoison(&self.shared.state);
if state.closed {
return false;
}
state.pending = Some(request);
self.shared.available.notify_one();
true
}
pub(in crate::app::jobs) fn cancel_trash(&self, token: u64) {
self.shared.cancel_token.store(token, Ordering::Relaxed);
}
pub(in crate::app::jobs) fn has_pending_work(&self) -> bool {
let state = lock_unpoison(&self.shared.state);
state.pending.is_some() || state.active
}
}
impl Drop for TrashPool {
fn drop(&mut self) {
{
let mut state = lock_unpoison(&self.shared.state);
state.closed = true;
}
self.shared.available.notify_all();
for worker in self.workers.drain(..) {
let _ = worker.join();
}
}
}
impl TrashShared {
fn pop(shared: &Arc<Self>) -> Option<TrashRequest> {
let mut state = lock_unpoison(&shared.state);
loop {
if let Some(request) = state.pending.take() {
return Some(request);
}
if state.closed {
return None;
}
state = wait_unpoison(&shared.available, state);
}
}
fn set_active(shared: &Arc<Self>, active: bool) {
lock_unpoison(&shared.state).active = active;
}
}
fn run_trash(
request: &TrashRequest,
result_tx: &mpsc::Sender<JobResult>,
cancelled: &AtomicBool,
cancel_token: &AtomicU64,
) -> (usize, Vec<String>, bool) {
if request.permanent {
run_permanent_delete(request, result_tx, cancelled, cancel_token)
} else {
run_trash_batch(request, cancelled, cancel_token)
}
}
fn run_permanent_delete(
request: &TrashRequest,
result_tx: &mpsc::Sender<JobResult>,
cancelled: &AtomicBool,
cancel_token: &AtomicU64,
) -> (usize, Vec<String>, bool) {
let staging = staging_dir();
let mut staged: Vec<(String, PathBuf)> = Vec::new();
let mut completed = 0usize;
let mut errors: Vec<String> = Vec::new();
let mut stopped_early = false;
let mut last_progress_at: Option<Instant> = None;
#[cfg(target_os = "macos")]
let mut deleted_names: Vec<&str> = Vec::new();
for target in &request.targets {
if cancelled.load(Ordering::Relaxed)
|| cancel_token.load(Ordering::Relaxed) == request.token
{
stopped_early = true;
break;
}
if target.is_dir {
match staging
.as_ref()
.and_then(|s| rename_into_staging(&target.path, s))
{
Some(staged_path) => {
staged.push((target.name.clone(), staged_path));
completed += 1;
#[cfg(target_os = "macos")]
deleted_names.push(target.name.as_str());
}
None => match fs::remove_dir_all(&target.path) {
Ok(()) => {
completed += 1;
#[cfg(target_os = "macos")]
deleted_names.push(target.name.as_str());
}
Err(e) => {
errors.push(format!("Could not delete \"{}\": {e}", target.name));
}
},
}
} else {
match fs::remove_file(&target.path) {
Ok(()) => {
completed += 1;
#[cfg(target_os = "macos")]
deleted_names.push(target.name.as_str());
}
Err(e) => errors.push(format!("Could not delete \"{}\": {e}", target.name)),
}
}
if !send_trash_progress(result_tx, request.token, completed, &mut last_progress_at) {
break;
}
}
let cleanup_errors = run_staged_cleanup(staged);
completed = completed.saturating_sub(cleanup_errors.len());
for name in cleanup_errors {
errors.push(format!("Could not delete \"{name}\": cleanup failed"));
}
#[cfg(target_os = "macos")]
if !deleted_names.is_empty() {
crate::fs::remove_restore_origins(&deleted_names);
}
(completed, errors, stopped_early)
}
fn staging_dir() -> Option<PathBuf> {
let pid = std::process::id();
dirs::data_dir().map(|d| d.join("elio").join("cleanup").join(pid.to_string()))
}
fn rename_into_staging(path: &Path, staging: &Path) -> Option<PathBuf> {
fs::create_dir_all(staging).ok()?;
let base = path.file_name().and_then(|n| n.to_str()).unwrap_or("dir");
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let dest = staging.join(format!("{base}-{pid}-{nanos}"));
fs::rename(path, &dest).ok()?;
Some(dest)
}
fn run_staged_cleanup(staged: Vec<(String, PathBuf)>) -> Vec<String> {
if staged.is_empty() {
return Vec::new();
}
let cap = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.min(4)
.min(staged.len());
let (tx, rx) = mpsc::channel::<(String, PathBuf)>();
let rx = Arc::new(Mutex::new(rx));
let (err_tx, err_rx) = mpsc::channel::<String>();
let workers: Vec<_> = (0..cap)
.map(|_| {
let rx = Arc::clone(&rx);
let err_tx = err_tx.clone();
thread::spawn(move || {
while let Ok((name, path)) = rx.lock().unwrap().recv() {
if fs::remove_dir_all(&path).is_err() {
let _ = err_tx.send(name);
}
}
})
})
.collect();
drop(err_tx);
for item in staged {
let _ = tx.send(item);
}
drop(tx);
for w in workers {
let _ = w.join();
}
err_rx.iter().collect()
}
pub(in crate::app::jobs) fn sweep_staging_on_startup() {
let current_pid = std::process::id();
let Some(cleanup_root) = dirs::data_dir().map(|d| d.join("elio").join("cleanup")) else {
return;
};
let current_pid_dir = cleanup_root.join(current_pid.to_string());
if current_pid_dir.exists() {
let _ = fs::remove_dir_all(¤t_pid_dir);
}
thread::spawn(move || sweep_staging_dir(&cleanup_root, current_pid));
}
fn sweep_staging_dir(cleanup_root: &Path, current_pid: u32) {
let current_pid_str = current_pid.to_string();
let Ok(entries) = fs::read_dir(cleanup_root) else {
return;
};
for entry in entries.flatten() {
let p = entry.path();
let name = p.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name == current_pid_str {
continue;
}
if let Ok(pid) = name.parse::<u32>()
&& pid_is_alive(pid)
{
continue;
}
if p.is_dir() {
let _ = fs::remove_dir_all(&p);
}
}
}
#[cfg(unix)]
fn pid_is_alive(pid: u32) -> bool {
let ret = unsafe { libc::kill(pid as libc::pid_t, 0) };
if ret == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
#[cfg(not(unix))]
fn pid_is_alive(_pid: u32) -> bool {
true
}
fn run_trash_batch(
request: &TrashRequest,
cancelled: &AtomicBool,
cancel_token: &AtomicU64,
) -> (usize, Vec<String>, bool) {
if cancelled.load(Ordering::Relaxed) || cancel_token.load(Ordering::Relaxed) == request.token {
return (0, Vec::new(), true);
}
let paths: Vec<_> = request.targets.iter().map(|t| &t.path).collect();
let total = paths.len();
match ::trash::delete_all(paths) {
Ok(()) => {
#[cfg(target_os = "macos")]
{
let origins: Vec<(String, std::path::PathBuf)> = request
.targets
.iter()
.map(|t| (t.name.clone(), t.path.clone()))
.collect();
crate::fs::save_restore_origins(&origins);
}
(total, Vec::new(), false)
}
Err(e) => (0, vec![e.to_string()], false),
}
}
fn send_trash_progress(
result_tx: &mpsc::Sender<JobResult>,
token: u64,
completed: usize,
last_progress_at: &mut Option<Instant>,
) -> bool {
let now = Instant::now();
let due = last_progress_at.is_none_or(|t| now.duration_since(t) >= PROGRESS_SEND_INTERVAL);
if due {
*last_progress_at = Some(now);
return result_tx
.send(JobResult::Trash(TrashBuild {
token,
completed,
done: false,
status: None,
}))
.is_ok();
}
true
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
fn make_tmp_dir(tag: &str) -> PathBuf {
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let dir = std::env::temp_dir().join(format!("elio-test-{tag}-{pid}-{nanos}"));
fs::create_dir_all(&dir).unwrap();
dir
}
fn startup_sweep(cleanup_root: &Path, current_pid: u32) {
let current_pid_dir = cleanup_root.join(current_pid.to_string());
if current_pid_dir.exists() {
let _ = fs::remove_dir_all(¤t_pid_dir);
}
sweep_staging_dir(cleanup_root, current_pid);
}
#[cfg(unix)]
fn dead_pid() -> u32 {
let mut child = std::process::Command::new("true").spawn().unwrap();
let pid = child.id();
child.wait().unwrap();
pid
}
#[cfg(unix)]
#[test]
fn sweep_removes_dead_pid_subdirectory() {
let cleanup_root = make_tmp_dir("sweep-dead");
let current_pid = std::process::id();
let dead = dead_pid();
let stale_dir = cleanup_root.join(dead.to_string());
fs::create_dir_all(&stale_dir).unwrap();
fs::write(stale_dir.join("inner.txt"), b"hello").unwrap();
sweep_staging_dir(&cleanup_root, current_pid);
assert!(!stale_dir.exists(), "dead-pid subdir should be removed");
let _ = fs::remove_dir_all(&cleanup_root);
}
#[cfg(unix)]
#[test]
fn sweep_skips_live_other_pid_subdirectory() {
let cleanup_root = make_tmp_dir("sweep-live-other");
let current_pid = std::process::id();
let mut child = std::process::Command::new("sleep")
.arg("60")
.spawn()
.unwrap();
let other_pid = child.id();
let live_dir = cleanup_root.join(other_pid.to_string());
fs::create_dir_all(&live_dir).unwrap();
fs::write(live_dir.join("staged.tmp"), b"in-flight").unwrap();
sweep_staging_dir(&cleanup_root, current_pid);
child.kill().ok();
child.wait().ok();
assert!(
live_dir.exists(),
"live other-instance dir must not be swept"
);
let _ = fs::remove_dir_all(&cleanup_root);
}
#[test]
fn sweep_skips_current_pid_subdirectory() {
let cleanup_root = make_tmp_dir("sweep-skip");
let current_pid = std::process::id();
let live_dir = cleanup_root.join(current_pid.to_string());
fs::create_dir_all(&live_dir).unwrap();
fs::write(live_dir.join("staged.tmp"), b"live").unwrap();
sweep_staging_dir(&cleanup_root, current_pid);
assert!(live_dir.exists(), "current-pid subdir must not be swept");
let _ = fs::remove_dir_all(&cleanup_root);
}
#[test]
fn startup_sweep_reclaims_stale_dir_with_reused_pid() {
let cleanup_root = make_tmp_dir("sweep-pid-reuse");
let current_pid = std::process::id();
let stale_same_pid_dir = cleanup_root.join(current_pid.to_string());
fs::create_dir_all(&stale_same_pid_dir).unwrap();
fs::write(stale_same_pid_dir.join("leftover.tmp"), b"stale").unwrap();
startup_sweep(&cleanup_root, current_pid);
assert!(
!stale_same_pid_dir.exists(),
"stale dir with reused PID should be removed at startup"
);
let _ = fs::remove_dir_all(&cleanup_root);
}
#[test]
fn sweep_is_no_op_for_empty_cleanup_root() {
let cleanup_root = make_tmp_dir("sweep-empty");
sweep_staging_dir(&cleanup_root, std::process::id());
assert!(cleanup_root.exists());
let _ = fs::remove_dir_all(&cleanup_root);
}
#[test]
fn sweep_is_no_op_when_cleanup_root_does_not_exist() {
let parent = make_tmp_dir("sweep-absent-parent");
let nonexistent = parent.join("no-such-dir");
sweep_staging_dir(&nonexistent, std::process::id());
let _ = fs::remove_dir_all(&parent);
}
#[test]
fn staged_cleanup_succeeds_and_returns_no_errors() {
let staging = make_tmp_dir("cleanup-ok");
let dir = staging.join("to-delete");
fs::create_dir_all(&dir).unwrap();
fs::write(dir.join("file.txt"), b"x").unwrap();
let errors = run_staged_cleanup(vec![("to-delete".to_string(), dir.clone())]);
assert!(errors.is_empty());
assert!(!dir.exists());
let _ = fs::remove_dir_all(&staging);
}
#[test]
fn staged_cleanup_returns_name_on_failure() {
let parent = make_tmp_dir("cleanup-fail-parent");
let missing = parent.join("ghost-dir");
let errors = run_staged_cleanup(vec![("ghost-dir".to_string(), missing)]);
assert_eq!(errors, vec!["ghost-dir"]);
let _ = fs::remove_dir_all(&parent);
}
#[test]
fn staged_cleanup_reports_only_failed_entries() {
let staging = make_tmp_dir("cleanup-mixed");
let good = staging.join("good");
fs::create_dir_all(&good).unwrap();
let bad = staging.join("bad-ghost");
let errors = run_staged_cleanup(vec![
("good".to_string(), good.clone()),
("bad-ghost".to_string(), bad),
]);
assert_eq!(errors, vec!["bad-ghost"]);
assert!(!good.exists());
let _ = fs::remove_dir_all(&staging);
}
}