vectx_core/
background.rs

1// Background I/O system inspired by Redis's BIO (Background I/O)
2// Implements job queues with worker threads for async operations
3
4use std::collections::VecDeque;
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8
9/// Background job types (inspired by Redis BIO)
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum BackgroundJobType {
12    HnswRebuild = 0,  // HNSW index rebuild
13    LazyFree = 1,     // Lazy memory freeing
14}
15
16/// Background job trait
17pub trait BackgroundJob: Send + 'static {
18    fn execute(self: Box<Self>);
19    fn job_type(&self) -> BackgroundJobType;
20}
21
22/// Background worker thread
23struct BackgroundWorker {
24    #[allow(dead_code)]
25    worker_id: usize,
26    #[allow(dead_code)]
27    job_type: BackgroundJobType,
28    jobs: Arc<Mutex<VecDeque<Box<dyn BackgroundJob>>>>,
29    condvar: Arc<Condvar>,
30    running: Arc<AtomicBool>,
31}
32
33impl BackgroundWorker {
34    fn new(worker_id: usize, job_type: BackgroundJobType) -> Self {
35        Self {
36            worker_id,
37            job_type,
38            jobs: Arc::new(Mutex::new(VecDeque::new())),
39            condvar: Arc::new(Condvar::new()),
40            running: Arc::new(AtomicBool::new(true)),
41        }
42    }
43
44    fn submit(&self, job: Box<dyn BackgroundJob>) {
45        let mut jobs = self.jobs.lock().unwrap();
46        jobs.push_back(job);
47        self.condvar.notify_one();
48    }
49
50    fn pending_jobs(&self) -> usize {
51        self.jobs.lock().unwrap().len()
52    }
53
54    fn shutdown(&self) {
55        self.running.store(false, Ordering::Release);
56        self.condvar.notify_all();
57    }
58}
59
60/// Background job system (Redis-style BIO)
61pub struct BackgroundJobSystem {
62    workers: Vec<Arc<BackgroundWorker>>,
63    job_counters: Arc<[AtomicU64; 2]>, // One counter per job type
64}
65
66impl BackgroundJobSystem {
67    /// Create a new background job system
68    pub fn new() -> Self {
69        let mut workers = Vec::new();
70        let mut handles = Vec::new();
71
72        // Create workers for each job type
73        for job_type in [BackgroundJobType::HnswRebuild, BackgroundJobType::LazyFree] {
74            let worker = BackgroundWorker::new(0, job_type);
75            let worker_arc = Arc::new(worker);
76            let worker_for_thread = worker_arc.clone();
77            let handle = thread::Builder::new()
78                .name(format!("bg-worker-{:?}-0", job_type))
79                .spawn(move || {
80                    let jobs = worker_for_thread.jobs.clone();
81                    let condvar = worker_for_thread.condvar.clone();
82                    let running = worker_for_thread.running.clone();
83                    
84                    loop {
85                        let mut jobs_guard = jobs.lock().unwrap();
86                        
87                        // Wait for jobs or shutdown signal
88                        while jobs_guard.is_empty() && running.load(Ordering::Acquire) {
89                            jobs_guard = condvar.wait(jobs_guard).unwrap();
90                        }
91
92                        // Check if we should shutdown
93                        if !running.load(Ordering::Acquire) && jobs_guard.is_empty() {
94                            break;
95                        }
96
97                        // Process jobs in FIFO order
98                        while let Some(job) = jobs_guard.pop_front() {
99                            drop(jobs_guard); // Release lock before executing
100                            job.execute();
101                            jobs_guard = jobs.lock().unwrap();
102                        }
103                    }
104                })
105                .expect("Failed to spawn background worker thread");
106            
107            handles.push(handle);
108            workers.push(worker_arc);
109        }
110
111        // Don't wait for handles - let them run in background
112        std::mem::forget(handles);
113
114        Self {
115            workers,
116            job_counters: Arc::new([
117                AtomicU64::new(0), // HnswRebuild
118                AtomicU64::new(0), // LazyFree
119            ]),
120        }
121    }
122
123    /// Submit a background job
124    pub fn submit(&self, job: Box<dyn BackgroundJob>) {
125        let job_type = job.job_type();
126        let worker = &self.workers[job_type as usize];
127        
128        self.job_counters[job_type as usize].fetch_add(1, Ordering::Relaxed);
129        worker.submit(job);
130    }
131
132    /// Get pending jobs count for a job type
133    pub fn pending_jobs(&self, job_type: BackgroundJobType) -> usize {
134        self.workers[job_type as usize].pending_jobs()
135    }
136
137    /// Get total jobs processed for a job type
138    pub fn jobs_processed(&self, job_type: BackgroundJobType) -> u64 {
139        self.job_counters[job_type as usize].load(Ordering::Relaxed)
140    }
141
142    /// Shutdown all workers
143    pub fn shutdown(&self) {
144        for worker in &self.workers {
145            worker.shutdown();
146        }
147    }
148}
149
150impl Default for BackgroundJobSystem {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156/// HNSW rebuild job
157pub struct HnswRebuildJob {
158    points: Vec<crate::Point>,
159    hnsw: Arc<parking_lot::RwLock<crate::HnswIndex>>,
160    built_flag: Arc<parking_lot::RwLock<bool>>,
161    rebuilding_flag: Arc<AtomicBool>,
162}
163
164impl HnswRebuildJob {
165    pub fn new(
166        points: Vec<crate::Point>,
167        hnsw: Arc<parking_lot::RwLock<crate::HnswIndex>>,
168        built_flag: Arc<parking_lot::RwLock<bool>>,
169        rebuilding_flag: Arc<AtomicBool>,
170    ) -> Self {
171        Self {
172            points,
173            hnsw,
174            built_flag,
175            rebuilding_flag,
176        }
177    }
178}
179
180impl BackgroundJob for HnswRebuildJob {
181    fn execute(self: Box<Self>) {
182        // Rebuild HNSW index from all points
183        let mut new_index = crate::HnswIndex::new(16, 3);
184        for point in self.points {
185            new_index.insert(point);
186        }
187
188        // Swap in the new index
189        *self.hnsw.write() = new_index;
190        *self.built_flag.write() = true;
191        self.rebuilding_flag.store(false, Ordering::Release);
192    }
193
194    fn job_type(&self) -> BackgroundJobType {
195        BackgroundJobType::HnswRebuild
196    }
197}
198
199/// Global background job system (initialized on first use)
200static BACKGROUND_SYSTEM: std::sync::OnceLock<Arc<BackgroundJobSystem>> = std::sync::OnceLock::new();
201
202/// Get the global background job system
203pub fn get_background_system() -> Arc<BackgroundJobSystem> {
204    BACKGROUND_SYSTEM.get_or_init(|| {
205        Arc::new(BackgroundJobSystem::new())
206    }).clone()
207}
208