use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::thread;
use std::time::Duration;
use crate::storage::file_format;
use crate::storage::memtable::MemTable;
use crate::storage::wal::Wal;
pub struct CompactionThread {
handle: Option<thread::JoinHandle<()>>,
stop_flag: Arc<AtomicBool>,
}
impl CompactionThread {
pub fn spawn<T: crate::VectorType>(
interval: Duration,
memtable: Arc<Mutex<MemTable<T>>>,
wal: Arc<Mutex<Wal>>,
db_path: String,
storage_mode: crate::database::StorageMode,
) -> Self {
let stop_flag = Arc::new(AtomicBool::new(false));
let stop = stop_flag.clone();
let handle = thread::spawn(move || {
loop {
let mut elapsed = Duration::ZERO;
let tick = Duration::from_millis(200);
while elapsed < interval {
if stop.load(Ordering::Relaxed) {
return;
}
thread::sleep(tick);
elapsed += tick;
}
if stop.load(Ordering::Relaxed) {
return;
}
{
let mut mt = memtable.lock().unwrap_or_else(|p| {
tracing::warn!("Compaction thread: MemTable Mutex poisoned, recovering...");
p.into_inner()
});
mt.ensure_vectors_cache();
}
let mut mt = memtable.lock().unwrap_or_else(|p| {
tracing::warn!("Compaction thread: MemTable Mutex poisoned, recovering...");
p.into_inner()
});
tracing::info!(
"Compaction I/O started for {}: foreground queries will be blocked during I/O",
db_path.clone()
);
match file_format::save(&mut mt, &db_path, storage_mode) {
Ok(_) => {
let mut w = wal.lock().unwrap_or_else(|p| {
tracing::warn!("Compaction thread: WAL Mutex poisoned, recovering...");
p.into_inner()
});
let _ = w.clear();
drop(w); drop(mt); tracing::debug!("Auto-compaction completed for {}", db_path);
}
Err(e) => {
tracing::error!("Auto-compaction failed for {}: {}", db_path, e);
}
}
}
});
Self {
handle: Some(handle),
stop_flag,
}
}
pub fn stop(&mut self) {
self.stop_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
impl Drop for CompactionThread {
fn drop(&mut self) {
self.stop();
}
}