Skip to main content

noxu_engine/
daemon_manager.rs

1//! Background daemon lifecycle management.
2
3use crate::engine_config::EngineConfig;
4use noxu_cleaner::Cleaner;
5use noxu_evictor::{EvictionSource, Evictor};
6use noxu_recovery::Checkpointer;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12/// A wakeup handle used by daemon threads to sleep with early-exit on shutdown.
13///
14/// Each daemon receives a clone of this handle. When `notify()` is called
15/// (at shutdown), the daemon wakes from its sleep immediately rather than
16/// waiting for the full interval to elapse.
17#[derive(Clone)]
18struct WakeHandle {
19    pair: Arc<(Mutex<bool>, Condvar)>,
20}
21
22impl WakeHandle {
23    fn new() -> Self {
24        Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
25    }
26
27    /// Sleep for `duration`, but return early if `notify()` is called.
28    ///
29    /// Returns `true` if the wakeup was triggered by a shutdown notification,
30    /// `false` if the timeout elapsed normally.
31    fn wait_timeout(&self, duration: Duration) -> bool {
32        let (lock, cvar) = &*self.pair;
33        let guard = lock.lock().unwrap();
34        let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
35        *guard
36    }
37
38    /// Notify the sleeping daemon to wake up immediately.
39    fn notify(&self) {
40        let (lock, cvar) = &*self.pair;
41        *lock.lock().unwrap() = true;
42        cvar.notify_all();
43    }
44}
45
46/// Manages the lifecycle of background daemon threads.
47///
48/// The DaemonManager is responsible for:
49/// - Starting daemon threads (evictor, cleaner, checkpointer)
50/// - Coordinating shutdown of all daemons
51/// - Tracking daemon running state
52///
53/// Each daemon runs in its own thread, periodically waking up to perform work.
54/// On shutdown, daemons are notified via a Condvar so they exit immediately
55/// instead of sleeping through their full wakeup interval.
56pub struct DaemonManager {
57    /// Shutdown signal shared by all daemon threads.
58    shutdown: Arc<AtomicBool>,
59
60    /// Wakeup handles for each daemon (used to unblock their sleep on shutdown).
61    evictor_wake: WakeHandle,
62    cleaner_wake: WakeHandle,
63    checkpointer_wake: WakeHandle,
64
65    /// Evictor daemon thread handle.
66    evictor_handle: Option<JoinHandle<()>>,
67
68    /// Cleaner daemon thread handle.
69    cleaner_handle: Option<JoinHandle<()>>,
70
71    /// Checkpointer daemon thread handle.
72    checkpointer_handle: Option<JoinHandle<()>>,
73
74    /// Whether evictor is enabled.
75    evictor_enabled: bool,
76
77    /// Whether cleaner is enabled.
78    cleaner_enabled: bool,
79
80    /// Whether checkpointer is enabled.
81    checkpointer_enabled: bool,
82
83    /// Evictor wakeup interval.
84    evictor_wakeup_ms: u64,
85
86    /// Cleaner wakeup interval.
87    cleaner_wakeup_ms: u64,
88
89    /// Checkpointer wakeup interval.
90    checkpointer_wakeup_ms: u64,
91}
92
93impl DaemonManager {
94    /// Creates a new DaemonManager from the given configuration.
95    ///
96    /// Daemons are not started until `start_daemons()` is called.
97    pub fn new(config: &EngineConfig) -> Self {
98        Self {
99            shutdown: Arc::new(AtomicBool::new(false)),
100            evictor_wake: WakeHandle::new(),
101            cleaner_wake: WakeHandle::new(),
102            checkpointer_wake: WakeHandle::new(),
103            evictor_handle: None,
104            cleaner_handle: None,
105            checkpointer_handle: None,
106            evictor_enabled: config.evictor_enabled,
107            cleaner_enabled: config.cleaner_enabled,
108            checkpointer_enabled: config.checkpointer_enabled,
109            evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
110            cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
111            checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
112        }
113    }
114
115    /// Starts all enabled daemon threads.
116    ///
117    /// Each daemon runs in a loop:
118    /// 1. Sleep for its wakeup interval
119    /// 2. Check shutdown flag
120    /// 3. Perform work (eviction, cleaning, checkpoint)
121    /// 4. Repeat
122    ///
123    /// # Arguments
124    /// * `evictor` - The evictor to use for eviction operations
125    /// * `cleaner` - The cleaner to use for cleaning operations
126    /// * `checkpointer` - The checkpointer to use for checkpoint operations
127    pub fn start_daemons(
128        &mut self,
129        evictor: Arc<Evictor>,
130        cleaner: Arc<Cleaner>,
131        checkpointer: Arc<Checkpointer>,
132    ) {
133        // Start evictor daemon
134        if self.evictor_enabled {
135            let shutdown = Arc::clone(&self.shutdown);
136            let wakeup_ms = self.evictor_wakeup_ms;
137            let evictor = Arc::clone(&evictor);
138            let wake = self.evictor_wake.clone();
139
140            self.evictor_handle = Some(thread::spawn(move || {
141                log::info!("Evictor daemon started");
142                while !shutdown.load(Ordering::Relaxed) {
143                    // Sleep for the wakeup interval, but return early on shutdown.
144                    let notified =
145                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
146                    if notified || shutdown.load(Ordering::Relaxed) {
147                        break;
148                    }
149
150                    // Perform eviction
151                    let result = evictor.do_evict(EvictionSource::Daemon);
152                    if result.nodes_evicted > 0 {
153                        log::debug!(
154                            "Evictor: evicted {} nodes, {} bytes",
155                            result.nodes_evicted,
156                            result.bytes_evicted
157                        );
158                    }
159                }
160                log::info!("Evictor daemon stopped");
161            }));
162        }
163
164        // Start cleaner daemon
165        if self.cleaner_enabled {
166            let shutdown = Arc::clone(&self.shutdown);
167            let wakeup_ms = self.cleaner_wakeup_ms;
168            let cleaner = Arc::clone(&cleaner);
169            let wake = self.cleaner_wake.clone();
170
171            self.cleaner_handle = Some(thread::spawn(move || {
172                log::info!("Cleaner daemon started");
173                while !shutdown.load(Ordering::Relaxed) {
174                    // Sleep for the wakeup interval, but return early on shutdown.
175                    let notified =
176                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
177                    if notified || shutdown.load(Ordering::Relaxed) {
178                        break;
179                    }
180
181                    // Perform cleaning
182                    match cleaner.do_clean(1, false) {
183                        Ok(result) => {
184                            if result.files_cleaned > 0 {
185                                log::debug!(
186                                    "Cleaner: cleaned {} files, deleted {} files",
187                                    result.files_cleaned,
188                                    result.files_deleted
189                                );
190                            }
191                        }
192                        Err(e) => {
193                            log::warn!("Cleaner error: {}", e);
194                        }
195                    }
196                }
197                log::info!("Cleaner daemon stopped");
198            }));
199        }
200
201        // Start checkpointer daemon
202        if self.checkpointer_enabled {
203            let shutdown = Arc::clone(&self.shutdown);
204            let wakeup_ms = self.checkpointer_wakeup_ms;
205            let checkpointer = Arc::clone(&checkpointer);
206            let wake = self.checkpointer_wake.clone();
207
208            self.checkpointer_handle = Some(thread::spawn(move || {
209                log::info!("Checkpointer daemon started");
210                while !shutdown.load(Ordering::Relaxed) {
211                    // Sleep for the wakeup interval, but return early on shutdown.
212                    let notified =
213                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
214                    if notified || shutdown.load(Ordering::Relaxed) {
215                        break;
216                    }
217
218                    // Perform checkpoint
219                    match checkpointer.do_checkpoint("daemon") {
220                        Ok(result) => {
221                            log::debug!(
222                                "Checkpoint: id={}, flushed {} nodes",
223                                result.checkpoint_id,
224                                result.total_nodes_flushed()
225                            );
226                        }
227                        Err(e) => {
228                            log::warn!("Checkpoint error: {}", e);
229                        }
230                    }
231                }
232                log::info!("Checkpointer daemon stopped");
233            }));
234        }
235    }
236
237    /// Signals shutdown and waits for all daemon threads to complete.
238    ///
239    /// This method:
240    /// 1. Sets the shutdown flag
241    /// 2. Notifies all sleeping daemons via their Condvar so they wake immediately
242    /// 3. Joins all daemon thread handles
243    /// 4. Waits for all threads to exit cleanly
244    pub fn shutdown(&mut self) {
245        // Signal shutdown
246        self.shutdown.store(true, Ordering::Relaxed);
247
248        // Wake all sleeping daemons immediately so they don't wait out their
249        // full sleep interval before noticing the shutdown flag.
250        self.evictor_wake.notify();
251        self.cleaner_wake.notify();
252        self.checkpointer_wake.notify();
253
254        // Join evictor
255        if let Some(handle) = self.evictor_handle.take()
256            && let Err(e) = handle.join()
257        {
258            log::error!("Failed to join evictor thread: {:?}", e);
259        }
260
261        // Join cleaner
262        if let Some(handle) = self.cleaner_handle.take()
263            && let Err(e) = handle.join()
264        {
265            log::error!("Failed to join cleaner thread: {:?}", e);
266        }
267
268        // Join checkpointer
269        if let Some(handle) = self.checkpointer_handle.take()
270            && let Err(e) = handle.join()
271        {
272            log::error!("Failed to join checkpointer thread: {:?}", e);
273        }
274    }
275
276    /// Returns `true` while this manager has not been shut down.
277    ///
278    /// Specifically, this returns `true` from construction until
279    /// [`shutdown`](Self::shutdown) is invoked. It does **not** prove that
280    /// any daemon thread is currently alive: a freshly-constructed manager
281    /// (before [`start_daemons`](Self::start_daemons) is called) reports
282    /// `true` here while [`running_count`](Self::running_count) returns 0.
283    ///
284    /// This semantic is codified by `test_daemon_manager_creation`, which
285    /// asserts both `is_running() == true` and `running_count() == 0`
286    /// before any daemons are started. Use `running_count()` if you need
287    /// the actual count of spawned daemon threads.
288    pub fn is_running(&self) -> bool {
289        // NB: name is historical. We return `!shutdown_requested` rather
290        // than checking the JoinHandles so that the post-`new`/pre-`start`
291        // contract above remains stable.
292        !self.shutdown.load(Ordering::Relaxed)
293    }
294
295    /// Returns the number of running daemons.
296    pub fn running_count(&self) -> usize {
297        let mut count = 0;
298        if self.evictor_enabled && self.evictor_handle.is_some() {
299            count += 1;
300        }
301        if self.cleaner_enabled && self.cleaner_handle.is_some() {
302            count += 1;
303        }
304        if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
305            count += 1;
306        }
307        count
308    }
309}
310
311impl Drop for DaemonManager {
312    fn drop(&mut self) {
313        // Ensure clean shutdown
314        if self.is_running() {
315            self.shutdown();
316        }
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use noxu_evictor::Arbiter;
324    use noxu_recovery::CheckpointConfig;
325    use std::sync::atomic::AtomicI64;
326
327    #[test]
328    fn test_daemon_manager_creation() {
329        let config = EngineConfig::default();
330        let manager = DaemonManager::new(&config);
331
332        assert!(manager.evictor_enabled);
333        assert!(manager.cleaner_enabled);
334        assert!(manager.checkpointer_enabled);
335        assert!(manager.is_running());
336        assert_eq!(manager.running_count(), 0); // Not started yet
337    }
338
339    #[test]
340    fn test_daemon_manager_with_disabled_daemons() {
341        let config = EngineConfig::default()
342            .evictor_enabled(false)
343            .cleaner_enabled(false)
344            .checkpointer_enabled(false);
345        let manager = DaemonManager::new(&config);
346
347        assert!(!manager.evictor_enabled);
348        assert!(!manager.cleaner_enabled);
349        assert!(!manager.checkpointer_enabled);
350    }
351
352    #[test]
353    fn test_daemon_manager_start_and_shutdown() {
354        let config = EngineConfig::default()
355            .evictor_wakeup_interval_ms(100)
356            .cleaner_wakeup_interval_ms(100)
357            .checkpointer_wakeup_interval_ms(100);
358
359        let mut manager = DaemonManager::new(&config);
360
361        // Create subsystems
362        let usage = Arc::new(AtomicI64::new(500));
363        let arbiter = Arbiter::new(1000, usage, 100, 200);
364        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
365        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
366        let checkpointer =
367            Arc::new(Checkpointer::new(CheckpointConfig::default()));
368
369        // Start daemons
370        manager.start_daemons(evictor, cleaner, checkpointer);
371
372        // Give threads time to start
373        thread::sleep(Duration::from_millis(50));
374        assert!(manager.is_running());
375        assert_eq!(manager.running_count(), 3);
376
377        // Shutdown
378        manager.shutdown();
379        assert!(!manager.is_running());
380    }
381
382    #[test]
383    fn test_daemon_manager_selective_daemons() {
384        let config = EngineConfig::default()
385            .evictor_enabled(true)
386            .cleaner_enabled(false)
387            .checkpointer_enabled(true)
388            .evictor_wakeup_interval_ms(100)
389            .checkpointer_wakeup_interval_ms(100);
390
391        let mut manager = DaemonManager::new(&config);
392
393        let usage = Arc::new(AtomicI64::new(500));
394        let arbiter = Arbiter::new(1000, usage, 100, 200);
395        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
396        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
397        let checkpointer =
398            Arc::new(Checkpointer::new(CheckpointConfig::default()));
399
400        manager.start_daemons(evictor, cleaner, checkpointer);
401
402        thread::sleep(Duration::from_millis(50));
403        assert_eq!(manager.running_count(), 2); // Only evictor and checkpointer
404
405        manager.shutdown();
406    }
407
408    #[test]
409    fn test_daemon_manager_drop_cleanup() {
410        let config = EngineConfig::default()
411            .evictor_wakeup_interval_ms(100)
412            .cleaner_wakeup_interval_ms(100)
413            .checkpointer_wakeup_interval_ms(100);
414
415        let mut manager = DaemonManager::new(&config);
416
417        let usage = Arc::new(AtomicI64::new(500));
418        let arbiter = Arbiter::new(1000, usage, 100, 200);
419        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
420        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
421        let checkpointer =
422            Arc::new(Checkpointer::new(CheckpointConfig::default()));
423
424        manager.start_daemons(evictor, cleaner, checkpointer);
425
426        thread::sleep(Duration::from_millis(50));
427        assert!(manager.is_running());
428
429        // Drop should trigger cleanup
430        drop(manager);
431    }
432
433    #[test]
434    fn test_daemon_wakeup_intervals() {
435        let config = EngineConfig::default()
436            .evictor_wakeup_interval_ms(1000)
437            .cleaner_wakeup_interval_ms(2000)
438            .checkpointer_wakeup_interval_ms(3000);
439
440        let manager = DaemonManager::new(&config);
441        assert_eq!(manager.evictor_wakeup_ms, 1000);
442        assert_eq!(manager.cleaner_wakeup_ms, 2000);
443        assert_eq!(manager.checkpointer_wakeup_ms, 3000);
444    }
445
446    /// Verify that shutdown returns quickly even when daemons are configured
447    /// with a long wakeup interval.  If the condvar notification is working,
448    /// this completes in well under the 5-second interval.
449    #[test]
450    fn test_shutdown_wakes_daemons_early() {
451        use std::time::Instant;
452
453        // Use a 5-second interval; shutdown must complete far faster than that.
454        let config = EngineConfig::default()
455            .evictor_wakeup_interval_ms(5000)
456            .cleaner_wakeup_interval_ms(5000)
457            .checkpointer_wakeup_interval_ms(5000);
458
459        let mut manager = DaemonManager::new(&config);
460
461        let usage = Arc::new(AtomicI64::new(500));
462        let arbiter = Arbiter::new(1000, usage, 100, 200);
463        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
464        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
465        let checkpointer =
466            Arc::new(Checkpointer::new(CheckpointConfig::default()));
467
468        manager.start_daemons(evictor, cleaner, checkpointer);
469
470        // Give threads a moment to enter their wait.
471        thread::sleep(Duration::from_millis(50));
472
473        let start = Instant::now();
474        manager.shutdown();
475        let elapsed = start.elapsed();
476
477        // Shutdown must complete in under 1 second even though sleep is 5 s.
478        assert!(
479            elapsed < Duration::from_secs(1),
480            "shutdown took {:?}, expected < 1s",
481            elapsed
482        );
483    }
484
485    #[test]
486    fn test_wake_handle_timeout() {
487        let handle = WakeHandle::new();
488
489        // With no notification the wait should time out (returns false).
490        let notified = handle.wait_timeout(Duration::from_millis(50));
491        assert!(!notified);
492    }
493
494    #[test]
495    fn test_wake_handle_notify() {
496        use std::time::Instant;
497
498        let handle = WakeHandle::new();
499        let handle2 = handle.clone();
500
501        // Spawn a thread that notifies after a short delay.
502        let t = thread::spawn(move || {
503            thread::sleep(Duration::from_millis(20));
504            handle2.notify();
505        });
506
507        let start = Instant::now();
508        // Wait up to 5 seconds; notification should arrive ~20 ms in.
509        let notified = handle.wait_timeout(Duration::from_secs(5));
510        let elapsed = start.elapsed();
511
512        t.join().unwrap();
513
514        assert!(notified, "expected notify to return true");
515        assert!(
516            elapsed < Duration::from_millis(500),
517            "took {:?}, expected wakeup within 500ms",
518            elapsed
519        );
520    }
521}