use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::{debug, info, warn};
use crate::engine::engine::MeruEngine;
pub struct BackgroundWorkers {
shutdown_flag: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
flush_handles: Vec<tokio::task::JoinHandle<()>>,
compaction_handles: Vec<tokio::task::JoinHandle<()>>,
}
impl BackgroundWorkers {
pub fn spawn(engine: Arc<MeruEngine>) -> Self {
let shutdown_flag = Arc::new(AtomicBool::new(false));
let shutdown_notify = Arc::new(Notify::new());
let flush_parallelism = engine.config.flush_parallelism;
let compaction_parallelism = engine.config.compaction_parallelism;
let mut flush_handles = Vec::new();
for i in 0..flush_parallelism {
let eng = engine.clone();
let flag = shutdown_flag.clone();
let notify = shutdown_notify.clone();
let handle = tokio::spawn(async move {
flush_worker(eng, flag, notify, i).await;
});
flush_handles.push(handle);
}
let mut compaction_handles = Vec::new();
for i in 0..compaction_parallelism {
let eng = engine.clone();
let flag = shutdown_flag.clone();
let notify = shutdown_notify.clone();
let handle = tokio::spawn(async move {
compaction_worker(eng, flag, notify, i).await;
});
compaction_handles.push(handle);
}
Self {
shutdown_flag,
shutdown_notify,
flush_handles,
compaction_handles,
}
}
pub async fn shutdown(mut self) {
self.shutdown_flag.store(true, Ordering::Release);
self.shutdown_notify.notify_waiters();
let flush = std::mem::take(&mut self.flush_handles);
let compaction = std::mem::take(&mut self.compaction_handles);
for h in flush {
let _ = h.await;
}
for h in compaction {
let _ = h.await;
}
}
}
impl Drop for BackgroundWorkers {
fn drop(&mut self) {
self.shutdown_flag.store(true, Ordering::Release);
self.shutdown_notify.notify_waiters();
for h in self.flush_handles.drain(..) {
h.abort();
}
for h in self.compaction_handles.drain(..) {
h.abort();
}
}
}
async fn flush_worker(
engine: Arc<MeruEngine>,
shutdown_flag: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
id: usize,
) {
debug!(worker = id, "flush worker started");
loop {
if shutdown_flag.load(Ordering::Acquire) {
info!(worker = id, "flush worker shutting down");
break;
}
tokio::select! {
_ = shutdown_notify.notified() => {
info!(worker = id, "flush worker shutting down");
break;
}
_ = engine.memtable.immutable_available.notified() => {}
_ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
}
while engine.memtable.oldest_immutable().is_some() {
if shutdown_flag.load(Ordering::Acquire) {
break;
}
match crate::engine::flush::run_flush(&engine).await {
Ok(_) => debug!(worker = id, "flush completed"),
Err(e) => {
if engine.is_closed() {
break;
}
warn!(worker = id, error = %e, "flush failed, will retry");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
}
async fn compaction_worker(
engine: Arc<MeruEngine>,
shutdown_flag: Arc<AtomicBool>,
shutdown_notify: Arc<Notify>,
id: usize,
) {
debug!(worker = id, "compaction worker started");
loop {
if shutdown_flag.load(Ordering::Acquire) {
info!(worker = id, "compaction worker shutting down");
break;
}
tokio::select! {
_ = shutdown_notify.notified() => {
info!(worker = id, "compaction worker shutting down");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
}
if shutdown_flag.load(Ordering::Acquire) {
break;
}
engine.gc_pending_deletions().await;
match crate::engine::compaction::job::run_compaction(&engine).await {
Ok(_) => debug!(worker = id, "compaction completed"),
Err(e) => {
if engine.is_closed() {
break;
}
warn!(worker = id, error = %e, "compaction failed, will retry");
}
}
}
}