kstone_core/
background.rs

1/// Background task management for LSM operations
2///
3/// Provides a background worker thread that performs compaction operations
4/// asynchronously without blocking database operations.
5
6use crate::compaction::{CompactionConfig, CompactionStatsAtomic};
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11use tracing::{debug, info, warn};
12
13/// Request to perform compaction on a stripe
14#[derive(Debug, Clone)]
15pub struct CompactionRequest {
16    /// Stripe ID to compact
17    pub stripe_id: usize,
18}
19
20/// Background worker for running compaction tasks
21pub struct BackgroundWorker {
22    /// Worker thread handle
23    handle: Option<JoinHandle<()>>,
24
25    /// Shutdown signal
26    shutdown: Arc<AtomicBool>,
27
28    /// Work queue (stripes needing compaction)
29    work_queue: Arc<Mutex<Vec<CompactionRequest>>>,
30
31    /// Compaction configuration
32    config: CompactionConfig,
33
34    /// Statistics
35    stats: CompactionStatsAtomic,
36}
37
38impl BackgroundWorker {
39    /// Create a new background worker
40    pub fn new(config: CompactionConfig, stats: CompactionStatsAtomic) -> Self {
41        Self {
42            handle: None,
43            shutdown: Arc::new(AtomicBool::new(false)),
44            work_queue: Arc::new(Mutex::new(Vec::new())),
45            config,
46            stats,
47        }
48    }
49
50    /// Start the background worker thread
51    ///
52    /// The worker will periodically check the work queue and process compaction requests.
53    pub fn start(&mut self) {
54        if self.handle.is_some() {
55            warn!("Background worker already running");
56            return;
57        }
58
59        let shutdown = Arc::clone(&self.shutdown);
60        let work_queue = Arc::clone(&self.work_queue);
61        let config = self.config.clone();
62        let stats = self.stats.clone();
63
64        info!("Starting background compaction worker");
65
66        let handle = thread::spawn(move || {
67            Self::worker_loop(shutdown, work_queue, config, stats);
68        });
69
70        self.handle = Some(handle);
71    }
72
73    /// Worker loop that runs in background thread
74    fn worker_loop(
75        shutdown: Arc<AtomicBool>,
76        work_queue: Arc<Mutex<Vec<CompactionRequest>>>,
77        config: CompactionConfig,
78        _stats: CompactionStatsAtomic,
79    ) {
80        debug!("Background worker loop started");
81
82        let check_interval = Duration::from_secs(config.check_interval_secs);
83
84        while !shutdown.load(Ordering::Relaxed) {
85            // Check for work
86            let work = {
87                let mut queue = work_queue.lock().unwrap();
88                if queue.is_empty() {
89                    None
90                } else {
91                    // Take up to max_concurrent_compactions items
92                    let count = queue.len().min(config.max_concurrent_compactions);
93                    Some(queue.drain(..count).collect::<Vec<_>>())
94                }
95            };
96
97            if let Some(requests) = work {
98                debug!("Processing {} compaction requests", requests.len());
99
100                for request in requests {
101                    if shutdown.load(Ordering::Relaxed) {
102                        debug!("Shutdown signal received, stopping compaction");
103                        break;
104                    }
105
106                    // TODO: Actually perform compaction here
107                    // This will be connected to LsmEngine in Task 4
108                    debug!("Would compact stripe {}", request.stripe_id);
109                }
110            }
111
112            // Sleep before checking again
113            thread::sleep(check_interval);
114        }
115
116        info!("Background worker loop exited");
117    }
118
119    /// Queue a compaction request for a stripe
120    pub fn queue_compaction(&self, stripe_id: usize) {
121        let mut queue = self.work_queue.lock().unwrap();
122
123        // Don't queue duplicate requests
124        if queue.iter().any(|r| r.stripe_id == stripe_id) {
125            debug!("Stripe {} already queued for compaction", stripe_id);
126            return;
127        }
128
129        debug!("Queuing compaction for stripe {}", stripe_id);
130        queue.push(CompactionRequest { stripe_id });
131    }
132
133    /// Get the current work queue size
134    pub fn queue_size(&self) -> usize {
135        self.work_queue.lock().unwrap().len()
136    }
137
138    /// Initiate graceful shutdown
139    ///
140    /// Signals the worker thread to stop and waits for it to finish.
141    pub fn shutdown(&mut self) {
142        info!("Initiating background worker shutdown");
143        self.shutdown.store(true, Ordering::Relaxed);
144
145        if let Some(handle) = self.handle.take() {
146            debug!("Waiting for background worker thread to exit");
147            if let Err(e) = handle.join() {
148                warn!("Error joining background worker thread: {:?}", e);
149            }
150        }
151
152        info!("Background worker shutdown complete");
153    }
154
155    /// Check if the worker is running
156    pub fn is_running(&self) -> bool {
157        self.handle.is_some() && !self.shutdown.load(Ordering::Relaxed)
158    }
159}
160
161impl Drop for BackgroundWorker {
162    fn drop(&mut self) {
163        if self.is_running() {
164            self.shutdown();
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[test]
174    fn test_background_worker_start_stop() {
175        let config = CompactionConfig::new();
176        let stats = CompactionStatsAtomic::new();
177        let mut worker = BackgroundWorker::new(config, stats);
178
179        assert!(!worker.is_running());
180
181        worker.start();
182        assert!(worker.is_running());
183
184        worker.shutdown();
185        assert!(!worker.is_running());
186    }
187
188    #[test]
189    fn test_background_worker_queue() {
190        let config = CompactionConfig::new();
191        let stats = CompactionStatsAtomic::new();
192        let worker = BackgroundWorker::new(config, stats);
193
194        assert_eq!(worker.queue_size(), 0);
195
196        worker.queue_compaction(1);
197        assert_eq!(worker.queue_size(), 1);
198
199        worker.queue_compaction(2);
200        assert_eq!(worker.queue_size(), 2);
201
202        // Duplicate should not be queued
203        worker.queue_compaction(1);
204        assert_eq!(worker.queue_size(), 2);
205    }
206
207    #[test]
208    fn test_background_worker_auto_shutdown_on_drop() {
209        let config = CompactionConfig::new();
210        let stats = CompactionStatsAtomic::new();
211        let mut worker = BackgroundWorker::new(config, stats);
212
213        worker.start();
214        assert!(worker.is_running());
215
216        // Drop should trigger shutdown
217        drop(worker);
218        // If this test completes, shutdown worked
219    }
220
221    #[test]
222    fn test_background_worker_disabled_config() {
223        let config = CompactionConfig::disabled();
224        let stats = CompactionStatsAtomic::new();
225        let mut worker = BackgroundWorker::new(config.clone(), stats);
226
227        // Start worker even though config is disabled
228        // (The actual compaction logic will check config.enabled)
229        worker.start();
230        assert!(worker.is_running());
231
232        worker.shutdown();
233    }
234
235    #[test]
236    fn test_background_worker_multiple_queued_items() {
237        let config = CompactionConfig::new().with_max_concurrent(2);
238        let stats = CompactionStatsAtomic::new();
239        let worker = BackgroundWorker::new(config, stats);
240
241        // Queue multiple stripes
242        for i in 0..10 {
243            worker.queue_compaction(i);
244        }
245
246        assert_eq!(worker.queue_size(), 10);
247    }
248}