use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::thread;
use std::time::Duration;
use crate::storage::file_format;
use crate::storage::memtable::MemTable;
use crate::storage::wal::Wal;
pub struct CompactionThread {
handle: Option<thread::JoinHandle<()>>,
stop_flag: Arc<AtomicBool>,
}
impl CompactionThread {
pub fn spawn<T: crate::VectorType>(
interval: Duration,
memtable: Arc<Mutex<MemTable<T>>>,
wal: Arc<Mutex<Wal>>,
db_path: String,
storage_mode: crate::database::StorageMode,
) -> Self {
let stop_flag = Arc::new(AtomicBool::new(false));
let stop = stop_flag.clone();
let handle = thread::spawn(move || {
loop {
let mut elapsed = Duration::ZERO;
let tick = Duration::from_millis(200);
while elapsed < interval {
if stop.load(Ordering::Relaxed) {
return;
}
thread::sleep(tick);
elapsed += tick;
}
if stop.load(Ordering::Relaxed) {
return;
}
let mut mt = memtable.lock().unwrap_or_else(|p| {
tracing::warn!("Compaction thread: MemTable Mutex poisoned, recovering...");
p.into_inner()
});
tracing::info!("Compaction started for {}: foreground queries will be blocked during I/O", db_path);
let node_count = mt.node_count();
if matches!(storage_mode, crate::database::StorageMode::Mmap) && node_count >= 20_000 {
mt.ensure_vectors_cache();
let flat = mt.flat_vectors();
let dim = mt.dim();
tracing::info!("[{}] Rebuilding ERPC Accelerated Index (nodes={})...", db_path, node_count);
let start_erpc = std::time::Instant::now();
let effort = 0.6; let new_erpc = crate::index::erpc::ErpcIndex::build(flat, dim, effort);
mt.erpc_index = Some(new_erpc);
tracing::info!("[{}] ERPC block finalized in {:?}", db_path, start_erpc.elapsed());
} else if matches!(storage_mode, crate::database::StorageMode::Mmap) {
mt.erpc_index = None;
}
match file_format::save(&mut mt, &db_path, storage_mode) {
Ok(_) => {
drop(mt); let mut w = wal.lock().unwrap_or_else(|p| {
tracing::warn!("Compaction thread: WAL Mutex poisoned, recovering...");
p.into_inner()
});
let _ = w.clear();
tracing::debug!("Auto-compaction completed for {}", db_path);
}
Err(e) => {
tracing::error!("Auto-compaction failed for {}: {}", db_path, e);
}
}
}
});
Self {
handle: Some(handle),
stop_flag,
}
}
pub fn stop(&mut self) {
self.stop_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
impl Drop for CompactionThread {
fn drop(&mut self) {
self.stop();
}
}