use super::*;
use crate::fs::{Fs, MemFs};
use std::path::Path;
fn write_file(fs: &MemFs, path: &Path, bytes: &[u8]) {
use std::io::Write;
let opts = crate::fs::FsOpenOptions::new().write(true).create(true);
let mut f = fs.open(path, &opts).unwrap();
f.write_all(bytes).unwrap();
}
pub(super) mod drain_barrier {
use std::sync::Mutex;
use std::sync::mpsc;
static CHANNEL: Mutex<Option<mpsc::Receiver<()>>> = Mutex::new(None);
pub fn arm() -> mpsc::Sender<()> {
let (tx, rx) = mpsc::channel();
*CHANNEL.lock().unwrap() = Some(rx);
tx
}
pub fn wait() {
let rx = CHANNEL.lock().unwrap().take();
if let Some(rx) = rx {
let _ = rx.recv();
}
}
}
#[test]
fn deletion_pause_defers_then_executes_removal() {
let fs = MemFs::new();
fs.create_dir_all(Path::new("/d")).unwrap();
let path = Path::new("/d/file.sst").to_path_buf();
write_file(&fs, &path, b"sst");
let dyn_fs: Arc<dyn Fs> = Arc::new(fs.clone());
let pause = DeletionPause::new_shared();
let guard = pause.acquire();
assert!(pause.try_enqueue(dyn_fs.clone(), path.clone()));
assert!(
fs.exists(&path).unwrap(),
"file must still exist while paused"
);
drop(guard);
assert!(
!fs.exists(&path).unwrap(),
"file must be removed after pause released"
);
}
#[test]
fn enqueue_returns_false_when_inactive() {
let fs = MemFs::new();
fs.create_dir_all(Path::new("/d")).unwrap();
let path = Path::new("/d/file.sst").to_path_buf();
write_file(&fs, &path, b"x");
let dyn_fs: Arc<dyn Fs> = Arc::new(fs.clone());
let pause = DeletionPause::new_shared();
assert!(!pause.try_enqueue(dyn_fs, path.clone()));
assert!(fs.exists(&path).unwrap());
}
#[test]
fn drain_does_not_steal_a_new_generation_queue() {
use std::sync::mpsc;
use std::thread;
let fs = MemFs::new();
fs.create_dir_all(Path::new("/d")).unwrap();
let path = Path::new("/d/race.sst").to_path_buf();
write_file(&fs, &path, b"keep-me");
let dyn_fs: Arc<dyn Fs> = Arc::new(fs.clone());
let pause = DeletionPause::new_shared();
let a = pause.acquire();
let release_a_tx = drain_barrier::arm();
let (in_window_tx, in_window_rx) = mpsc::channel::<()>();
let (b_ready_tx, b_ready_rx) = mpsc::channel::<()>();
let (release_b_tx, release_b_rx) = mpsc::channel::<()>();
let a_pause = Arc::clone(&pause);
let a_thread = thread::spawn(move || {
in_window_tx.send(()).unwrap();
drop(a);
drop(a_pause);
});
in_window_rx.recv().unwrap();
while pause.active.load(Ordering::Acquire) != 0 {
core::hint::spin_loop();
}
let b_pause = Arc::clone(&pause);
let b_fs = Arc::clone(&dyn_fs);
let b_path = path.clone();
let b_thread = thread::spawn(move || {
let _b = b_pause.acquire();
assert!(b_pause.try_enqueue(b_fs, b_path));
b_ready_tx.send(()).unwrap();
release_b_rx.recv().unwrap();
});
b_ready_rx.recv().unwrap();
release_a_tx.send(()).unwrap();
a_thread.join().unwrap();
assert!(
fs.exists(&path).unwrap(),
"file must survive while Thread B holds an active pause \
(a's drain leaked into b's generation)",
);
release_b_tx.send(()).unwrap();
b_thread.join().unwrap();
assert!(
!fs.exists(&path).unwrap(),
"file should be removed after both pauses dropped",
);
}
#[test]
fn nested_pauses_only_release_on_last_drop() {
let fs = MemFs::new();
fs.create_dir_all(Path::new("/d")).unwrap();
let path = Path::new("/d/file.sst").to_path_buf();
write_file(&fs, &path, b"x");
let dyn_fs: Arc<dyn Fs> = Arc::new(fs.clone());
let pause = DeletionPause::new_shared();
let outer = pause.acquire();
let inner = pause.acquire();
assert!(pause.try_enqueue(dyn_fs, path.clone()));
drop(inner);
assert!(fs.exists(&path).unwrap(), "still paused by outer guard");
drop(outer);
assert!(
!fs.exists(&path).unwrap(),
"released after last guard dropped"
);
}