Skip to main content

noxu_engine/
daemon_manager.rs

1//! Background daemon lifecycle management.
2//!
3//! # DST M1.1 note: no injectable Clock is threaded here (deliberate)
4//!
5//! The daemon wakeup interval is a *config-supplied* `Duration`
6//! (`evictor_wakeup_ms` / `cleaner_wakeup_ms` / `checkpointer_wakeup_ms`),
7//! passed straight to [`WakeHandle::wait_timeout`]; there is no hardcoded
8//! `std::time` control-flow site to virtualise.  The shutdown path's liveness
9//! is **notify-driven** (`shutdown()` sets the flag and calls
10//! `WakeHandle::notify`), not timeout-driven — which is exactly why the
11//! Milestone-2 shuttle gate (`tests/shuttle_daemon_shutdown.rs`) proves it
12//! deadlock-free without any clock injection.  A `SimClock` would add nothing
13//! the config `Duration` + the notify seam do not already give, so the Clock
14//! is intentionally NOT threaded through the daemon loops (matches the M1
15//! deferral rationale).
16
17use crate::engine_config::EngineConfig;
18use noxu_cleaner::Cleaner;
19use noxu_evictor::{EvictionSource, Evictor};
20use noxu_recovery::Checkpointer;
21use noxu_util::dst_sync::atomic::{AtomicBool, Ordering};
22use noxu_util::dst_sync::{Arc, Condvar, Mutex, thread};
23use std::time::Duration;
24
25/// A wakeup handle used by daemon threads to sleep with early-exit on shutdown.
26///
27/// Each daemon receives a clone of this handle. When `notify()` is called
28/// (at shutdown), the daemon wakes from its sleep immediately rather than
29/// waiting for the full interval to elapse.
30///
31/// `pub(crate)` so the DST Milestone 2 shuttle test
32/// (`tests/shuttle_daemon_shutdown.rs`, gated behind `--cfg noxu_shuttle`) can
33/// drive the real sleep/notify coordination through shuttle's scheduler.
34#[derive(Clone)]
35#[doc(hidden)]
36pub struct WakeHandle {
37    pair: Arc<(Mutex<bool>, Condvar)>,
38}
39
40impl WakeHandle {
41    pub(crate) fn new() -> Self {
42        Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
43    }
44
45    /// Sleep for `duration`, but return early if `notify()` is called.
46    ///
47    /// Returns `true` if the wakeup was triggered by a shutdown notification,
48    /// `false` if the timeout elapsed normally.
49    ///
50    /// The notify flag is checked *before* blocking on the condvar: a
51    /// `notify()` that lands between the caller's previous loop iteration and
52    /// this call sets the flag under the mutex, and `notify_all` on a condvar
53    /// with no waiter is a no-op.  Without the pre-check the daemon would block
54    /// for the full `duration` despite an already-pending notify (a lost
55    /// wakeup) — in production merely a shutdown *stall* up to `duration`, but a
56    /// hang under the DST shuttle scheduler, whose `wait_timeout` never times
57    /// out.  The pre-check closes that window.  (Surfaced by the Milestone-2
58    /// shuttle gate, `tests/shuttle_daemon_shutdown.rs`.)
59    #[doc(hidden)]
60    pub fn wait_timeout(&self, duration: Duration) -> bool {
61        let (lock, cvar) = &*self.pair;
62        let guard = lock.lock().unwrap();
63        if *guard {
64            return true;
65        }
66        let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
67        *guard
68    }
69
70    /// Notify the sleeping daemon to wake up immediately.
71    #[doc(hidden)]
72    pub fn notify(&self) {
73        let (lock, cvar) = &*self.pair;
74        *lock.lock().unwrap() = true;
75        cvar.notify_all();
76    }
77}
78
79#[cfg(noxu_shuttle)]
80impl WakeHandle {
81    /// Shuttle-test constructor (mirrors [`WakeHandle::new`]).
82    #[doc(hidden)]
83    pub fn new_for_shuttle() -> Self {
84        Self::new()
85    }
86}
87
88/// Manages the lifecycle of background daemon threads.
89///
90/// The DaemonManager is responsible for:
91/// - Starting daemon threads (evictor, cleaner, checkpointer)
92/// - Coordinating shutdown of all daemons
93/// - Tracking daemon running state
94///
95/// Each daemon runs in its own thread, periodically waking up to perform work.
96/// On shutdown, daemons are notified via a Condvar so they exit immediately
97/// instead of sleeping through their full wakeup interval.
98pub struct DaemonManager {
99    /// Shutdown signal shared by all daemon threads.
100    shutdown: Arc<AtomicBool>,
101
102    /// Wakeup handles for each daemon (used to unblock their sleep on shutdown).
103    evictor_wake: WakeHandle,
104    cleaner_wake: WakeHandle,
105    checkpointer_wake: WakeHandle,
106
107    /// Evictor daemon thread handle.
108    evictor_handle: Option<thread::JoinHandle<()>>,
109
110    /// Cleaner daemon thread handle.
111    cleaner_handle: Option<thread::JoinHandle<()>>,
112
113    /// Checkpointer daemon thread handle.
114    checkpointer_handle: Option<thread::JoinHandle<()>>,
115
116    /// Whether evictor is enabled.
117    evictor_enabled: bool,
118
119    /// Whether cleaner is enabled.
120    cleaner_enabled: bool,
121
122    /// Whether checkpointer is enabled.
123    checkpointer_enabled: bool,
124
125    /// Evictor wakeup interval.
126    evictor_wakeup_ms: u64,
127
128    /// Cleaner wakeup interval.
129    cleaner_wakeup_ms: u64,
130
131    /// Checkpointer wakeup interval.
132    checkpointer_wakeup_ms: u64,
133}
134
135impl DaemonManager {
136    /// Creates a new DaemonManager from the given configuration.
137    ///
138    /// Daemons are not started until `start_daemons()` is called.
139    pub fn new(config: &EngineConfig) -> Self {
140        Self {
141            shutdown: Arc::new(AtomicBool::new(false)),
142            evictor_wake: WakeHandle::new(),
143            cleaner_wake: WakeHandle::new(),
144            checkpointer_wake: WakeHandle::new(),
145            evictor_handle: None,
146            cleaner_handle: None,
147            checkpointer_handle: None,
148            evictor_enabled: config.evictor_enabled,
149            cleaner_enabled: config.cleaner_enabled,
150            checkpointer_enabled: config.checkpointer_enabled,
151            evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
152            cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
153            checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
154        }
155    }
156
157    /// Starts all enabled daemon threads.
158    ///
159    /// Each daemon runs in a loop:
160    /// 1. Sleep for its wakeup interval
161    /// 2. Check shutdown flag
162    /// 3. Perform work (eviction, cleaning, checkpoint)
163    /// 4. Repeat
164    ///
165    /// # Arguments
166    /// * `evictor` - The evictor to use for eviction operations
167    /// * `cleaner` - The cleaner to use for cleaning operations
168    /// * `checkpointer` - The checkpointer to use for checkpoint operations
169    pub fn start_daemons(
170        &mut self,
171        evictor: Arc<Evictor>,
172        cleaner: Arc<Cleaner>,
173        checkpointer: Arc<Checkpointer>,
174    ) {
175        // Start evictor daemon
176        if self.evictor_enabled {
177            let shutdown = Arc::clone(&self.shutdown);
178            let wakeup_ms = self.evictor_wakeup_ms;
179            let evictor = Arc::clone(&evictor);
180            let wake = self.evictor_wake.clone();
181
182            self.evictor_handle = Some(thread::spawn(move || {
183                log::info!("Evictor daemon started");
184                while !shutdown.load(Ordering::Relaxed) {
185                    // Sleep for the wakeup interval, but return early on shutdown.
186                    let notified =
187                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
188                    if notified || shutdown.load(Ordering::Relaxed) {
189                        break;
190                    }
191
192                    // Perform eviction
193                    let result = evictor.do_evict(EvictionSource::Daemon);
194                    if result.nodes_evicted > 0 {
195                        log::debug!(
196                            "Evictor: evicted {} nodes, {} bytes",
197                            result.nodes_evicted,
198                            result.bytes_evicted
199                        );
200                    }
201                }
202                log::info!("Evictor daemon stopped");
203            }));
204        }
205
206        // Start cleaner daemon
207        if self.cleaner_enabled {
208            let shutdown = Arc::clone(&self.shutdown);
209            let wakeup_ms = self.cleaner_wakeup_ms;
210            let cleaner = Arc::clone(&cleaner);
211            let wake = self.cleaner_wake.clone();
212
213            self.cleaner_handle = Some(thread::spawn(move || {
214                log::info!("Cleaner daemon started");
215                while !shutdown.load(Ordering::Relaxed) {
216                    // Sleep for the wakeup interval, but return early on shutdown.
217                    let notified =
218                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
219                    if notified || shutdown.load(Ordering::Relaxed) {
220                        break;
221                    }
222
223                    // Perform cleaning
224                    match cleaner.do_clean(1, false) {
225                        Ok(result) => {
226                            if result.files_cleaned > 0 {
227                                log::debug!(
228                                    "Cleaner: cleaned {} files, deleted {} files",
229                                    result.files_cleaned,
230                                    result.files_deleted
231                                );
232                            }
233                        }
234                        Err(e) => {
235                            log::warn!("Cleaner error: {}", e);
236                        }
237                    }
238                }
239                log::info!("Cleaner daemon stopped");
240            }));
241        }
242
243        // Start checkpointer daemon
244        if self.checkpointer_enabled {
245            let shutdown = Arc::clone(&self.shutdown);
246            let wakeup_ms = self.checkpointer_wakeup_ms;
247            let checkpointer = Arc::clone(&checkpointer);
248            let wake = self.checkpointer_wake.clone();
249
250            self.checkpointer_handle = Some(thread::spawn(move || {
251                log::info!("Checkpointer daemon started");
252                while !shutdown.load(Ordering::Relaxed) {
253                    // Sleep for the wakeup interval, but return early on shutdown.
254                    let notified =
255                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
256                    if notified || shutdown.load(Ordering::Relaxed) {
257                        break;
258                    }
259
260                    // JE Checkpointer.isRunnable: skip the periodic checkpoint
261                    // on an idle environment (nothing written since the last
262                    // one) instead of writing a CheckpointEnd every wakeup.
263                    if !checkpointer.is_runnable(false) {
264                        continue;
265                    }
266                    // Perform checkpoint
267                    match checkpointer.do_checkpoint("daemon") {
268                        Ok(result) => {
269                            log::debug!(
270                                "Checkpoint: id={}, flushed {} nodes",
271                                result.checkpoint_id,
272                                result.total_nodes_flushed()
273                            );
274                        }
275                        Err(e) => {
276                            log::warn!("Checkpoint error: {}", e);
277                        }
278                    }
279                }
280                log::info!("Checkpointer daemon stopped");
281            }));
282        }
283    }
284
285    /// Signals shutdown and waits for all daemon threads to complete.
286    ///
287    /// Shutdown order mirrors JE `EnvironmentImpl.shutdownDaemons`:
288    ///   1. Set the shutdown flag and wake all sleeping daemons.
289    ///   2. Join the **cleaner** first — it can call the checkpointer
290    ///      internally, so it must stop before the checkpointer stops.
291    ///   3. Join the **checkpointer** — must stop before the evictor, because
292    ///      the final checkpoint must complete while the evictor is still able
293    ///      to flush dirty nodes that other daemons produce.
294    ///   4. Join the **evictor** last — it remains available to flush dirty
295    ///      nodes until all other daemons have exited.
296    ///
297    /// JE citation: `EnvironmentImpl.shutdownDaemons` comment:
298    ///   "Cleaner has to be shutdown before checkpointer because former
299    ///   calls the latter."
300    pub fn shutdown(&mut self) {
301        // Step 1: signal shutdown and wake all sleeping daemons immediately
302        // so they do not wait out their full sleep interval.
303        self.shutdown.store(true, Ordering::Relaxed);
304        self.cleaner_wake.notify();
305        self.checkpointer_wake.notify();
306        self.evictor_wake.notify();
307
308        // Step 2: join cleaner first (it may call checkpointer internally).
309        if let Some(handle) = self.cleaner_handle.take()
310            && let Err(e) = handle.join()
311        {
312            log::error!("Failed to join cleaner thread: {:?}", e);
313        }
314
315        // Step 3: join checkpointer after cleaner has stopped.
316        if let Some(handle) = self.checkpointer_handle.take()
317            && let Err(e) = handle.join()
318        {
319            log::error!("Failed to join checkpointer thread: {:?}", e);
320        }
321
322        // Step 4: join evictor last — it must remain available until
323        // the checkpoint completes so dirty nodes can be flushed.
324        if let Some(handle) = self.evictor_handle.take()
325            && let Err(e) = handle.join()
326        {
327            log::error!("Failed to join evictor thread: {:?}", e);
328        }
329    }
330
331    /// Returns `true` while this manager has not been shut down.
332    ///
333    /// Specifically, this returns `true` from construction until
334    /// [`shutdown`](Self::shutdown) is invoked. It does **not** prove that
335    /// any daemon thread is currently alive: a freshly-constructed manager
336    /// (before [`start_daemons`](Self::start_daemons) is called) reports
337    /// `true` here while [`running_count`](Self::running_count) returns 0.
338    ///
339    /// This semantic is codified by `test_daemon_manager_creation`, which
340    /// asserts both `is_running() == true` and `running_count() == 0`
341    /// before any daemons are started. Use `running_count()` if you need
342    /// the actual count of spawned daemon threads.
343    pub fn is_running(&self) -> bool {
344        // NB: name is historical. We return `!shutdown_requested` rather
345        // than checking the JoinHandles so that the post-`new`/pre-`start`
346        // contract above remains stable.
347        !self.shutdown.load(Ordering::Relaxed)
348    }
349
350    /// Returns the number of running daemons.
351    pub fn running_count(&self) -> usize {
352        let mut count = 0;
353        if self.evictor_enabled && self.evictor_handle.is_some() {
354            count += 1;
355        }
356        if self.cleaner_enabled && self.cleaner_handle.is_some() {
357            count += 1;
358        }
359        if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
360            count += 1;
361        }
362        count
363    }
364}
365
366impl Drop for DaemonManager {
367    fn drop(&mut self) {
368        // Ensure clean shutdown
369        if self.is_running() {
370            self.shutdown();
371        }
372    }
373}
374
375/// DST Milestone 2 (Phase 2a) hook: expose the internal sleep/notify handle so
376/// the shuttle test (`tests/shuttle_daemon_shutdown.rs`) can drive the real
377/// daemon-loop-vs-shutdown coordination through shuttle's scheduler.  Only
378/// compiled under `--cfg noxu_shuttle`; invisible to every other build.
379#[cfg(noxu_shuttle)]
380pub mod dst_hooks {
381    pub use super::WakeHandle;
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use noxu_evictor::Arbiter;
388    use noxu_recovery::CheckpointConfig;
389    use std::sync::atomic::AtomicI64;
390
391    #[test]
392    fn test_daemon_manager_creation() {
393        let config = EngineConfig::default();
394        let manager = DaemonManager::new(&config);
395
396        assert!(manager.evictor_enabled);
397        assert!(manager.cleaner_enabled);
398        assert!(manager.checkpointer_enabled);
399        assert!(manager.is_running());
400        assert_eq!(manager.running_count(), 0); // Not started yet
401    }
402
403    #[test]
404    fn test_daemon_manager_with_disabled_daemons() {
405        let config = EngineConfig::default()
406            .evictor_enabled(false)
407            .cleaner_enabled(false)
408            .checkpointer_enabled(false);
409        let manager = DaemonManager::new(&config);
410
411        assert!(!manager.evictor_enabled);
412        assert!(!manager.cleaner_enabled);
413        assert!(!manager.checkpointer_enabled);
414    }
415
416    #[test]
417    fn test_daemon_manager_start_and_shutdown() {
418        let config = EngineConfig::default()
419            .evictor_wakeup_interval_ms(100)
420            .cleaner_wakeup_interval_ms(100)
421            .checkpointer_wakeup_interval_ms(100);
422
423        let mut manager = DaemonManager::new(&config);
424
425        // Create subsystems
426        let usage = Arc::new(AtomicI64::new(500));
427        let arbiter = Arbiter::new(1000, usage, 100, 200);
428        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
429        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
430        let checkpointer =
431            Arc::new(Checkpointer::new(CheckpointConfig::default()));
432
433        // Start daemons
434        manager.start_daemons(evictor, cleaner, checkpointer);
435
436        // Give threads time to start
437        thread::sleep(Duration::from_millis(50));
438        assert!(manager.is_running());
439        assert_eq!(manager.running_count(), 3);
440
441        // Shutdown
442        manager.shutdown();
443        assert!(!manager.is_running());
444    }
445
446    #[test]
447    fn test_daemon_manager_selective_daemons() {
448        let config = EngineConfig::default()
449            .evictor_enabled(true)
450            .cleaner_enabled(false)
451            .checkpointer_enabled(true)
452            .evictor_wakeup_interval_ms(100)
453            .checkpointer_wakeup_interval_ms(100);
454
455        let mut manager = DaemonManager::new(&config);
456
457        let usage = Arc::new(AtomicI64::new(500));
458        let arbiter = Arbiter::new(1000, usage, 100, 200);
459        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
460        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
461        let checkpointer =
462            Arc::new(Checkpointer::new(CheckpointConfig::default()));
463
464        manager.start_daemons(evictor, cleaner, checkpointer);
465
466        thread::sleep(Duration::from_millis(50));
467        assert_eq!(manager.running_count(), 2); // Only evictor and checkpointer
468
469        manager.shutdown();
470    }
471
472    #[test]
473    fn test_daemon_manager_drop_cleanup() {
474        let config = EngineConfig::default()
475            .evictor_wakeup_interval_ms(100)
476            .cleaner_wakeup_interval_ms(100)
477            .checkpointer_wakeup_interval_ms(100);
478
479        let mut manager = DaemonManager::new(&config);
480
481        let usage = Arc::new(AtomicI64::new(500));
482        let arbiter = Arbiter::new(1000, usage, 100, 200);
483        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
484        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
485        let checkpointer =
486            Arc::new(Checkpointer::new(CheckpointConfig::default()));
487
488        manager.start_daemons(evictor, cleaner, checkpointer);
489
490        thread::sleep(Duration::from_millis(50));
491        assert!(manager.is_running());
492
493        // Drop should trigger cleanup
494        drop(manager);
495    }
496
497    #[test]
498    fn test_daemon_wakeup_intervals() {
499        let config = EngineConfig::default()
500            .evictor_wakeup_interval_ms(1000)
501            .cleaner_wakeup_interval_ms(2000)
502            .checkpointer_wakeup_interval_ms(3000);
503
504        let manager = DaemonManager::new(&config);
505        assert_eq!(manager.evictor_wakeup_ms, 1000);
506        assert_eq!(manager.cleaner_wakeup_ms, 2000);
507        assert_eq!(manager.checkpointer_wakeup_ms, 3000);
508    }
509
510    /// Verify that shutdown returns quickly even when daemons are configured
511    /// with a long wakeup interval.  If the condvar notification is working,
512    /// this completes in well under the 5-second interval.
513    #[test]
514    fn test_shutdown_wakes_daemons_early() {
515        use std::time::Instant;
516
517        // Use a 5-second interval; shutdown must complete far faster than that.
518        let config = EngineConfig::default()
519            .evictor_wakeup_interval_ms(5000)
520            .cleaner_wakeup_interval_ms(5000)
521            .checkpointer_wakeup_interval_ms(5000);
522
523        let mut manager = DaemonManager::new(&config);
524
525        let usage = Arc::new(AtomicI64::new(500));
526        let arbiter = Arbiter::new(1000, usage, 100, 200);
527        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
528        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
529        let checkpointer =
530            Arc::new(Checkpointer::new(CheckpointConfig::default()));
531
532        manager.start_daemons(evictor, cleaner, checkpointer);
533
534        // Give threads a moment to enter their wait.
535        thread::sleep(Duration::from_millis(50));
536
537        let start = Instant::now();
538        manager.shutdown();
539        let elapsed = start.elapsed();
540
541        // Shutdown must complete in under 1 second even though sleep is 5 s.
542        assert!(
543            elapsed < Duration::from_secs(1),
544            "shutdown took {:?}, expected < 1s",
545            elapsed
546        );
547    }
548
549    #[test]
550    fn test_wake_handle_timeout() {
551        let handle = WakeHandle::new();
552
553        // With no notification the wait should time out (returns false).
554        let notified = handle.wait_timeout(Duration::from_millis(50));
555        assert!(!notified);
556    }
557
558    #[test]
559    fn test_wake_handle_notify() {
560        use std::time::Instant;
561
562        let handle = WakeHandle::new();
563        let handle2 = handle.clone();
564
565        // Spawn a thread that notifies after a short delay.
566        let t = thread::spawn(move || {
567            thread::sleep(Duration::from_millis(20));
568            handle2.notify();
569        });
570
571        let start = Instant::now();
572        // Wait up to 5 seconds; notification should arrive ~20 ms in.
573        let notified = handle.wait_timeout(Duration::from_secs(5));
574        let elapsed = start.elapsed();
575
576        t.join().unwrap();
577
578        assert!(notified, "expected notify to return true");
579        assert!(
580            elapsed < Duration::from_millis(500),
581            "took {:?}, expected wakeup within 500ms",
582            elapsed
583        );
584    }
585
586    // -----------------------------------------------------------------------
587    // CC-3: JE-correct shutdown order (cleaner → checkpointer → evictor)
588    // -----------------------------------------------------------------------
589
590    /// Verifies that the daemons stop in the JE-mandated order:
591    ///   cleaner → checkpointer → evictor.
592    ///
593    /// We instrument DaemonManager's join sequence by using threads that
594    /// block each other: cleaner exits immediately, checkpointer waits for
595    /// the cleaner to be joined, evictor waits for the checkpointer to be
596    /// joined.  If the join order were wrong the test would deadlock (and
597    /// the bounded-time assertion would fire).
598    ///
599    /// Separately we capture the join-completion order from the calling
600    /// thread via a shared sequence counter.
601    ///
602    /// JE reference: `EnvironmentImpl.shutdownDaemons` — "Cleaner has to be
603    /// shutdown before checkpointer because former calls the latter."
604    #[test]
605    fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
606        use std::sync::Mutex;
607        use std::time::Instant;
608
609        // Each daemon thread records a monotone join-sequence number.
610        // The thread blocks until the *previous* daemon in the correct order
611        // has already been joined — this makes a wrong join order deadlock.
612        let join_seq: Arc<Mutex<Vec<&'static str>>> =
613            Arc::new(Mutex::new(Vec::new()));
614
615        let shutdown_flag = Arc::new(AtomicBool::new(false));
616
617        // Barrier pairs: cleaner releases checkpointer; checkpointer releases evictor.
618        let cleaner_joined =
619            Arc::new((Mutex::new(false), std::sync::Condvar::new()));
620        let checkpointer_joined =
621            Arc::new((Mutex::new(false), std::sync::Condvar::new()));
622
623        let wake_c = WakeHandle::new();
624        let wake_cp = WakeHandle::new();
625        let wake_ev = WakeHandle::new();
626
627        // Cleaner: exits immediately after shutdown signal.
628        let sd_c = shutdown_flag.clone();
629        let wake_c2 = wake_c.clone();
630        let cleaner_t = thread::spawn(move || {
631            while !sd_c.load(Ordering::Relaxed) {
632                wake_c2.wait_timeout(Duration::from_millis(5000));
633            }
634            // No blocking — exits right away so join_cleaner completes first.
635        });
636
637        // Checkpointer: waits until cleaner has been joined, then exits.
638        let sd_cp = shutdown_flag.clone();
639        let wake_cp2 = wake_cp.clone();
640        let cj = cleaner_joined.clone();
641        let checkpointer_t = thread::spawn(move || {
642            while !sd_cp.load(Ordering::Relaxed) {
643                wake_cp2.wait_timeout(Duration::from_millis(5000));
644            }
645            // Block until the calling thread has joined the cleaner.
646            let (lock, cv) = &*cj;
647            let mut g = lock.lock().unwrap();
648            while !*g {
649                g = cv.wait(g).unwrap();
650            }
651        });
652
653        // Evictor: waits until checkpointer has been joined, then exits.
654        let sd_ev = shutdown_flag.clone();
655        let wake_ev2 = wake_ev.clone();
656        let cpj = checkpointer_joined.clone();
657        let evictor_t = thread::spawn(move || {
658            while !sd_ev.load(Ordering::Relaxed) {
659                wake_ev2.wait_timeout(Duration::from_millis(5000));
660            }
661            let (lock, cv) = &*cpj;
662            let mut g = lock.lock().unwrap();
663            while !*g {
664                g = cv.wait(g).unwrap();
665            }
666        });
667
668        // Simulate shutdown: signal + wake.
669        shutdown_flag.store(true, Ordering::Relaxed);
670        wake_c.notify();
671        wake_cp.notify();
672        wake_ev.notify();
673
674        let start = Instant::now();
675
676        // Join cleaner first.
677        cleaner_t.join().unwrap();
678        join_seq.lock().unwrap().push("cleaner");
679        {
680            let (l, cv) = &*cleaner_joined;
681            *l.lock().unwrap() = true;
682            cv.notify_all();
683        }
684
685        // Join checkpointer second.
686        checkpointer_t.join().unwrap();
687        join_seq.lock().unwrap().push("checkpointer");
688        {
689            let (l, cv) = &*checkpointer_joined;
690            *l.lock().unwrap() = true;
691            cv.notify_all();
692        }
693
694        // Join evictor last.
695        evictor_t.join().unwrap();
696        join_seq.lock().unwrap().push("evictor");
697
698        let elapsed = start.elapsed();
699        assert!(
700            elapsed < Duration::from_secs(2),
701            "CC-3: shutdown stalled: {:?}",
702            elapsed
703        );
704
705        let order = join_seq.lock().unwrap();
706        assert_eq!(
707            *order,
708            vec!["cleaner", "checkpointer", "evictor"],
709            "CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
710        );
711    }
712
713    /// Shutdown must complete within a bounded time even with long wakeup
714    /// intervals — and must NOT deadlock (the join sequence must not block
715    /// a later join waiting on an earlier one).
716    #[test]
717    fn test_cc3_shutdown_no_deadlock_bounded_time() {
718        use std::time::Instant;
719
720        // Very long intervals; shutdown must complete fast via condvar.
721        let config = EngineConfig::default()
722            .evictor_wakeup_interval_ms(10_000)
723            .cleaner_wakeup_interval_ms(10_000)
724            .checkpointer_wakeup_interval_ms(10_000);
725
726        let mut manager = DaemonManager::new(&config);
727
728        let usage = Arc::new(AtomicI64::new(500));
729        let arbiter = Arbiter::new(1000, usage, 100, 200);
730        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
731        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
732        let checkpointer =
733            Arc::new(Checkpointer::new(CheckpointConfig::default()));
734
735        manager.start_daemons(evictor, cleaner, checkpointer);
736        thread::sleep(Duration::from_millis(30));
737
738        let start = Instant::now();
739        manager.shutdown();
740        let elapsed = start.elapsed();
741
742        assert!(
743            elapsed < Duration::from_secs(2),
744            "CC-3: shutdown deadlocked or stalled: took {:?}",
745            elapsed
746        );
747        assert!(!manager.is_running());
748    }
749}