#![cfg(feature = "std")]
use crate::fs::Fs;
use std::{
path::PathBuf,
sync::{
Arc,
mpsc::{Receiver, Sender, channel},
},
thread::JoinHandle,
time::Duration,
};
struct DeleteJob {
fs: Arc<dyn Fs>,
path: PathBuf,
}
pub struct BackgroundDeleter {
sender: Option<Sender<DeleteJob>>,
worker: Option<JoinHandle<()>>,
}
impl core::fmt::Debug for BackgroundDeleter {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("BackgroundDeleter").finish_non_exhaustive()
}
}
impl BackgroundDeleter {
#[must_use]
pub fn new(min_interval: Option<Duration>) -> Self {
let (sender, receiver) = channel::<DeleteJob>();
let worker = std::thread::Builder::new()
.name("lsm-deleter".into())
.spawn(move || Self::run(&receiver, min_interval))
.ok();
Self {
sender: Some(sender),
worker,
}
}
pub fn enqueue(&self, fs: Arc<dyn Fs>, path: PathBuf) {
let job = DeleteJob { fs, path };
match &self.sender {
Some(sender) => {
if let Err(std::sync::mpsc::SendError(job)) = sender.send(job) {
Self::unlink_now(&job);
}
}
None => Self::unlink_now(&job),
}
}
fn unlink_now(job: &DeleteJob) {
if let Err(e) = job.fs.remove_file(&job.path) {
log::warn!(
"background deleter sync fallback failed to unlink {}: {e:?}",
job.path.display(),
);
}
}
fn run(receiver: &Receiver<DeleteJob>, min_interval: Option<Duration>) {
while let Ok(job) = receiver.recv() {
if let Err(e) = job.fs.remove_file(&job.path) {
log::warn!(
"background deleter failed to unlink {}: {e:?}",
job.path.display(),
);
}
if let Some(interval) = min_interval {
std::thread::sleep(interval);
}
}
}
}
impl Drop for BackgroundDeleter {
fn drop(&mut self) {
drop(self.sender.take());
if let Some(worker) = self.worker.take() {
let _ = worker.join();
}
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, reason = "test code")]
mod tests {
use super::*;
use crate::fs::{Fs, FsOpenOptions, MemFs};
use std::io::Write;
#[test]
fn drains_queued_deletions_on_drop() {
let fs: Arc<dyn Fs> = Arc::new(MemFs::default());
let paths: Vec<PathBuf> = (0..16).map(|i| PathBuf::from(format!("/f{i}"))).collect();
for p in &paths {
let mut f = fs
.open(p, &FsOpenOptions::new().write(true).create(true))
.unwrap();
f.write_all(b"data").unwrap();
f.flush().unwrap();
assert!(fs.open(p, &FsOpenOptions::new().read(true)).is_ok());
}
{
let deleter = BackgroundDeleter::new(None);
for p in &paths {
deleter.enqueue(Arc::clone(&fs), p.clone());
}
}
for p in &paths {
assert!(
fs.open(p, &FsOpenOptions::new().read(true)).is_err(),
"{} should have been unlinked by the background deleter",
p.display(),
);
}
}
}