kstone_core/
background.rs1use crate::compaction::{CompactionConfig, CompactionStatsAtomic};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone)]
15pub struct CompactionRequest {
16 pub stripe_id: usize,
18}
19
20pub struct BackgroundWorker {
22 handle: Option<JoinHandle<()>>,
24
25 shutdown: Arc<AtomicBool>,
27
28 work_queue: Arc<Mutex<Vec<CompactionRequest>>>,
30
31 config: CompactionConfig,
33
34 stats: CompactionStatsAtomic,
36}
37
38impl BackgroundWorker {
39 pub fn new(config: CompactionConfig, stats: CompactionStatsAtomic) -> Self {
41 Self {
42 handle: None,
43 shutdown: Arc::new(AtomicBool::new(false)),
44 work_queue: Arc::new(Mutex::new(Vec::new())),
45 config,
46 stats,
47 }
48 }
49
50 pub fn start(&mut self) {
54 if self.handle.is_some() {
55 warn!("Background worker already running");
56 return;
57 }
58
59 let shutdown = Arc::clone(&self.shutdown);
60 let work_queue = Arc::clone(&self.work_queue);
61 let config = self.config.clone();
62 let stats = self.stats.clone();
63
64 info!("Starting background compaction worker");
65
66 let handle = thread::spawn(move || {
67 Self::worker_loop(shutdown, work_queue, config, stats);
68 });
69
70 self.handle = Some(handle);
71 }
72
73 fn worker_loop(
75 shutdown: Arc<AtomicBool>,
76 work_queue: Arc<Mutex<Vec<CompactionRequest>>>,
77 config: CompactionConfig,
78 _stats: CompactionStatsAtomic,
79 ) {
80 debug!("Background worker loop started");
81
82 let check_interval = Duration::from_secs(config.check_interval_secs);
83
84 while !shutdown.load(Ordering::Relaxed) {
85 let work = {
87 let mut queue = work_queue.lock().unwrap();
88 if queue.is_empty() {
89 None
90 } else {
91 let count = queue.len().min(config.max_concurrent_compactions);
93 Some(queue.drain(..count).collect::<Vec<_>>())
94 }
95 };
96
97 if let Some(requests) = work {
98 debug!("Processing {} compaction requests", requests.len());
99
100 for request in requests {
101 if shutdown.load(Ordering::Relaxed) {
102 debug!("Shutdown signal received, stopping compaction");
103 break;
104 }
105
106 debug!("Would compact stripe {}", request.stripe_id);
109 }
110 }
111
112 thread::sleep(check_interval);
114 }
115
116 info!("Background worker loop exited");
117 }
118
119 pub fn queue_compaction(&self, stripe_id: usize) {
121 let mut queue = self.work_queue.lock().unwrap();
122
123 if queue.iter().any(|r| r.stripe_id == stripe_id) {
125 debug!("Stripe {} already queued for compaction", stripe_id);
126 return;
127 }
128
129 debug!("Queuing compaction for stripe {}", stripe_id);
130 queue.push(CompactionRequest { stripe_id });
131 }
132
133 pub fn queue_size(&self) -> usize {
135 self.work_queue.lock().unwrap().len()
136 }
137
138 pub fn shutdown(&mut self) {
142 info!("Initiating background worker shutdown");
143 self.shutdown.store(true, Ordering::Relaxed);
144
145 if let Some(handle) = self.handle.take() {
146 debug!("Waiting for background worker thread to exit");
147 if let Err(e) = handle.join() {
148 warn!("Error joining background worker thread: {:?}", e);
149 }
150 }
151
152 info!("Background worker shutdown complete");
153 }
154
155 pub fn is_running(&self) -> bool {
157 self.handle.is_some() && !self.shutdown.load(Ordering::Relaxed)
158 }
159}
160
161impl Drop for BackgroundWorker {
162 fn drop(&mut self) {
163 if self.is_running() {
164 self.shutdown();
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[test]
174 fn test_background_worker_start_stop() {
175 let config = CompactionConfig::new();
176 let stats = CompactionStatsAtomic::new();
177 let mut worker = BackgroundWorker::new(config, stats);
178
179 assert!(!worker.is_running());
180
181 worker.start();
182 assert!(worker.is_running());
183
184 worker.shutdown();
185 assert!(!worker.is_running());
186 }
187
188 #[test]
189 fn test_background_worker_queue() {
190 let config = CompactionConfig::new();
191 let stats = CompactionStatsAtomic::new();
192 let worker = BackgroundWorker::new(config, stats);
193
194 assert_eq!(worker.queue_size(), 0);
195
196 worker.queue_compaction(1);
197 assert_eq!(worker.queue_size(), 1);
198
199 worker.queue_compaction(2);
200 assert_eq!(worker.queue_size(), 2);
201
202 worker.queue_compaction(1);
204 assert_eq!(worker.queue_size(), 2);
205 }
206
207 #[test]
208 fn test_background_worker_auto_shutdown_on_drop() {
209 let config = CompactionConfig::new();
210 let stats = CompactionStatsAtomic::new();
211 let mut worker = BackgroundWorker::new(config, stats);
212
213 worker.start();
214 assert!(worker.is_running());
215
216 drop(worker);
218 }
220
221 #[test]
222 fn test_background_worker_disabled_config() {
223 let config = CompactionConfig::disabled();
224 let stats = CompactionStatsAtomic::new();
225 let mut worker = BackgroundWorker::new(config.clone(), stats);
226
227 worker.start();
230 assert!(worker.is_running());
231
232 worker.shutdown();
233 }
234
235 #[test]
236 fn test_background_worker_multiple_queued_items() {
237 let config = CompactionConfig::new().with_max_concurrent(2);
238 let stats = CompactionStatsAtomic::new();
239 let worker = BackgroundWorker::new(config, stats);
240
241 for i in 0..10 {
243 worker.queue_compaction(i);
244 }
245
246 assert_eq!(worker.queue_size(), 10);
247 }
248}