use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackgroundJobType {
HnswRebuild = 0, LazyFree = 1, }
pub trait BackgroundJob: Send + 'static {
fn execute(self: Box<Self>);
fn job_type(&self) -> BackgroundJobType;
}
struct BackgroundWorker {
#[allow(dead_code)]
worker_id: usize,
#[allow(dead_code)]
job_type: BackgroundJobType,
jobs: Arc<Mutex<VecDeque<Box<dyn BackgroundJob>>>>,
condvar: Arc<Condvar>,
running: Arc<AtomicBool>,
}
impl BackgroundWorker {
fn new(worker_id: usize, job_type: BackgroundJobType) -> Self {
Self {
worker_id,
job_type,
jobs: Arc::new(Mutex::new(VecDeque::new())),
condvar: Arc::new(Condvar::new()),
running: Arc::new(AtomicBool::new(true)),
}
}
fn submit(&self, job: Box<dyn BackgroundJob>) {
let mut jobs = self.jobs.lock().unwrap();
jobs.push_back(job);
self.condvar.notify_one();
}
fn pending_jobs(&self) -> usize {
self.jobs.lock().unwrap().len()
}
fn shutdown(&self) {
self.running.store(false, Ordering::Release);
self.condvar.notify_all();
}
}
pub struct BackgroundJobSystem {
workers: Vec<Arc<BackgroundWorker>>,
job_counters: Arc<[AtomicU64; 2]>, }
impl BackgroundJobSystem {
pub fn new() -> Self {
let mut workers = Vec::new();
let mut handles = Vec::new();
for job_type in [BackgroundJobType::HnswRebuild, BackgroundJobType::LazyFree] {
let worker = BackgroundWorker::new(0, job_type);
let worker_arc = Arc::new(worker);
let worker_for_thread = worker_arc.clone();
let handle = thread::Builder::new()
.name(format!("bg-worker-{:?}-0", job_type))
.spawn(move || {
let jobs = worker_for_thread.jobs.clone();
let condvar = worker_for_thread.condvar.clone();
let running = worker_for_thread.running.clone();
loop {
let mut jobs_guard = jobs.lock().unwrap();
while jobs_guard.is_empty() && running.load(Ordering::Acquire) {
jobs_guard = condvar.wait(jobs_guard).unwrap();
}
if !running.load(Ordering::Acquire) && jobs_guard.is_empty() {
break;
}
while let Some(job) = jobs_guard.pop_front() {
drop(jobs_guard); job.execute();
jobs_guard = jobs.lock().unwrap();
}
}
})
.expect("Failed to spawn background worker thread");
handles.push(handle);
workers.push(worker_arc);
}
std::mem::forget(handles);
Self {
workers,
job_counters: Arc::new([
AtomicU64::new(0), AtomicU64::new(0), ]),
}
}
pub fn submit(&self, job: Box<dyn BackgroundJob>) {
let job_type = job.job_type();
let worker = &self.workers[job_type as usize];
self.job_counters[job_type as usize].fetch_add(1, Ordering::Relaxed);
worker.submit(job);
}
pub fn pending_jobs(&self, job_type: BackgroundJobType) -> usize {
self.workers[job_type as usize].pending_jobs()
}
pub fn jobs_processed(&self, job_type: BackgroundJobType) -> u64 {
self.job_counters[job_type as usize].load(Ordering::Relaxed)
}
pub fn shutdown(&self) {
for worker in &self.workers {
worker.shutdown();
}
}
}
impl Default for BackgroundJobSystem {
fn default() -> Self {
Self::new()
}
}
pub struct HnswRebuildJob {
points: Vec<crate::Point>,
hnsw: Arc<parking_lot::RwLock<crate::HnswIndex>>,
built_flag: Arc<parking_lot::RwLock<bool>>,
rebuilding_flag: Arc<AtomicBool>,
}
impl HnswRebuildJob {
pub fn new(
points: Vec<crate::Point>,
hnsw: Arc<parking_lot::RwLock<crate::HnswIndex>>,
built_flag: Arc<parking_lot::RwLock<bool>>,
rebuilding_flag: Arc<AtomicBool>,
) -> Self {
Self {
points,
hnsw,
built_flag,
rebuilding_flag,
}
}
}
impl BackgroundJob for HnswRebuildJob {
fn execute(self: Box<Self>) {
let mut new_index = crate::HnswIndex::new(16, 3);
for point in self.points {
new_index.insert(point);
}
*self.hnsw.write() = new_index;
*self.built_flag.write() = true;
self.rebuilding_flag.store(false, Ordering::Release);
}
fn job_type(&self) -> BackgroundJobType {
BackgroundJobType::HnswRebuild
}
}
static BACKGROUND_SYSTEM: std::sync::OnceLock<Arc<BackgroundJobSystem>> = std::sync::OnceLock::new();
pub fn get_background_system() -> Arc<BackgroundJobSystem> {
BACKGROUND_SYSTEM.get_or_init(|| {
Arc::new(BackgroundJobSystem::new())
}).clone()
}