1use std::collections::VecDeque;
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum BackgroundJobType {
12 HnswRebuild = 0, LazyFree = 1, }
15
16pub trait BackgroundJob: Send + 'static {
18 fn execute(self: Box<Self>);
19 fn job_type(&self) -> BackgroundJobType;
20}
21
22struct BackgroundWorker {
24 #[allow(dead_code)]
25 worker_id: usize,
26 #[allow(dead_code)]
27 job_type: BackgroundJobType,
28 jobs: Arc<Mutex<VecDeque<Box<dyn BackgroundJob>>>>,
29 condvar: Arc<Condvar>,
30 running: Arc<AtomicBool>,
31}
32
33impl BackgroundWorker {
34 fn new(worker_id: usize, job_type: BackgroundJobType) -> Self {
35 Self {
36 worker_id,
37 job_type,
38 jobs: Arc::new(Mutex::new(VecDeque::new())),
39 condvar: Arc::new(Condvar::new()),
40 running: Arc::new(AtomicBool::new(true)),
41 }
42 }
43
44 fn submit(&self, job: Box<dyn BackgroundJob>) {
45 let mut jobs = self.jobs.lock().unwrap();
46 jobs.push_back(job);
47 self.condvar.notify_one();
48 }
49
50 fn pending_jobs(&self) -> usize {
51 self.jobs.lock().unwrap().len()
52 }
53
54 fn shutdown(&self) {
55 self.running.store(false, Ordering::Release);
56 self.condvar.notify_all();
57 }
58}
59
60pub struct BackgroundJobSystem {
62 workers: Vec<Arc<BackgroundWorker>>,
63 job_counters: Arc<[AtomicU64; 2]>, }
65
66impl BackgroundJobSystem {
67 pub fn new() -> Self {
69 let mut workers = Vec::new();
70 let mut handles = Vec::new();
71
72 for job_type in [BackgroundJobType::HnswRebuild, BackgroundJobType::LazyFree] {
74 let worker = BackgroundWorker::new(0, job_type);
75 let worker_arc = Arc::new(worker);
76 let worker_for_thread = worker_arc.clone();
77 let handle = thread::Builder::new()
78 .name(format!("bg-worker-{:?}-0", job_type))
79 .spawn(move || {
80 let jobs = worker_for_thread.jobs.clone();
81 let condvar = worker_for_thread.condvar.clone();
82 let running = worker_for_thread.running.clone();
83
84 loop {
85 let mut jobs_guard = jobs.lock().unwrap();
86
87 while jobs_guard.is_empty() && running.load(Ordering::Acquire) {
89 jobs_guard = condvar.wait(jobs_guard).unwrap();
90 }
91
92 if !running.load(Ordering::Acquire) && jobs_guard.is_empty() {
94 break;
95 }
96
97 while let Some(job) = jobs_guard.pop_front() {
99 drop(jobs_guard); job.execute();
101 jobs_guard = jobs.lock().unwrap();
102 }
103 }
104 })
105 .expect("Failed to spawn background worker thread");
106
107 handles.push(handle);
108 workers.push(worker_arc);
109 }
110
111 std::mem::forget(handles);
113
114 Self {
115 workers,
116 job_counters: Arc::new([
117 AtomicU64::new(0), AtomicU64::new(0), ]),
120 }
121 }
122
123 pub fn submit(&self, job: Box<dyn BackgroundJob>) {
125 let job_type = job.job_type();
126 let worker = &self.workers[job_type as usize];
127
128 self.job_counters[job_type as usize].fetch_add(1, Ordering::Relaxed);
129 worker.submit(job);
130 }
131
132 pub fn pending_jobs(&self, job_type: BackgroundJobType) -> usize {
134 self.workers[job_type as usize].pending_jobs()
135 }
136
137 pub fn jobs_processed(&self, job_type: BackgroundJobType) -> u64 {
139 self.job_counters[job_type as usize].load(Ordering::Relaxed)
140 }
141
142 pub fn shutdown(&self) {
144 for worker in &self.workers {
145 worker.shutdown();
146 }
147 }
148}
149
150impl Default for BackgroundJobSystem {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156pub struct HnswRebuildJob {
158 points: Vec<crate::Point>,
159 hnsw: Arc<parking_lot::RwLock<crate::HnswIndex>>,
160 built_flag: Arc<parking_lot::RwLock<bool>>,
161 rebuilding_flag: Arc<AtomicBool>,
162}
163
164impl HnswRebuildJob {
165 pub fn new(
166 points: Vec<crate::Point>,
167 hnsw: Arc<parking_lot::RwLock<crate::HnswIndex>>,
168 built_flag: Arc<parking_lot::RwLock<bool>>,
169 rebuilding_flag: Arc<AtomicBool>,
170 ) -> Self {
171 Self {
172 points,
173 hnsw,
174 built_flag,
175 rebuilding_flag,
176 }
177 }
178}
179
180impl BackgroundJob for HnswRebuildJob {
181 fn execute(self: Box<Self>) {
182 let mut new_index = crate::HnswIndex::new(16, 3);
184 for point in self.points {
185 new_index.insert(point);
186 }
187
188 *self.hnsw.write() = new_index;
190 *self.built_flag.write() = true;
191 self.rebuilding_flag.store(false, Ordering::Release);
192 }
193
194 fn job_type(&self) -> BackgroundJobType {
195 BackgroundJobType::HnswRebuild
196 }
197}
198
199static BACKGROUND_SYSTEM: std::sync::OnceLock<Arc<BackgroundJobSystem>> = std::sync::OnceLock::new();
201
202pub fn get_background_system() -> Arc<BackgroundJobSystem> {
204 BACKGROUND_SYSTEM.get_or_init(|| {
205 Arc::new(BackgroundJobSystem::new())
206 }).clone()
207}
208