Skip to main content

noxu_engine/
daemon_manager.rs

1//! Background daemon lifecycle management.
2
3use crate::engine_config::EngineConfig;
4use noxu_cleaner::Cleaner;
5use noxu_evictor::{EvictionSource, Evictor};
6use noxu_recovery::Checkpointer;
7use noxu_util::dst_sync::atomic::{AtomicBool, Ordering};
8use noxu_util::dst_sync::{Arc, Condvar, Mutex, thread};
9use std::time::Duration;
10
11/// A wakeup handle used by daemon threads to sleep with early-exit on shutdown.
12///
13/// Each daemon receives a clone of this handle. When `notify()` is called
14/// (at shutdown), the daemon wakes from its sleep immediately rather than
15/// waiting for the full interval to elapse.
16///
17/// `pub(crate)` so the DST Milestone 2 shuttle test
18/// (`tests/shuttle_daemon_shutdown.rs`, gated behind `--cfg noxu_shuttle`) can
19/// drive the real sleep/notify coordination through shuttle's scheduler.
20#[derive(Clone)]
21#[doc(hidden)]
22pub struct WakeHandle {
23    pair: Arc<(Mutex<bool>, Condvar)>,
24}
25
26impl WakeHandle {
27    pub(crate) fn new() -> Self {
28        Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
29    }
30
31    /// Sleep for `duration`, but return early if `notify()` is called.
32    ///
33    /// Returns `true` if the wakeup was triggered by a shutdown notification,
34    /// `false` if the timeout elapsed normally.
35    ///
36    /// The notify flag is checked *before* blocking on the condvar: a
37    /// `notify()` that lands between the caller's previous loop iteration and
38    /// this call sets the flag under the mutex, and `notify_all` on a condvar
39    /// with no waiter is a no-op.  Without the pre-check the daemon would block
40    /// for the full `duration` despite an already-pending notify (a lost
41    /// wakeup) — in production merely a shutdown *stall* up to `duration`, but a
42    /// hang under the DST shuttle scheduler, whose `wait_timeout` never times
43    /// out.  The pre-check closes that window.  (Surfaced by the Milestone-2
44    /// shuttle gate, `tests/shuttle_daemon_shutdown.rs`.)
45    #[doc(hidden)]
46    pub fn wait_timeout(&self, duration: Duration) -> bool {
47        let (lock, cvar) = &*self.pair;
48        let guard = lock.lock().unwrap();
49        if *guard {
50            return true;
51        }
52        let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
53        *guard
54    }
55
56    /// Notify the sleeping daemon to wake up immediately.
57    #[doc(hidden)]
58    pub fn notify(&self) {
59        let (lock, cvar) = &*self.pair;
60        *lock.lock().unwrap() = true;
61        cvar.notify_all();
62    }
63}
64
65#[cfg(noxu_shuttle)]
66impl WakeHandle {
67    /// Shuttle-test constructor (mirrors [`WakeHandle::new`]).
68    #[doc(hidden)]
69    pub fn new_for_shuttle() -> Self {
70        Self::new()
71    }
72}
73
74/// Manages the lifecycle of background daemon threads.
75///
76/// The DaemonManager is responsible for:
77/// - Starting daemon threads (evictor, cleaner, checkpointer)
78/// - Coordinating shutdown of all daemons
79/// - Tracking daemon running state
80///
81/// Each daemon runs in its own thread, periodically waking up to perform work.
82/// On shutdown, daemons are notified via a Condvar so they exit immediately
83/// instead of sleeping through their full wakeup interval.
84pub struct DaemonManager {
85    /// Shutdown signal shared by all daemon threads.
86    shutdown: Arc<AtomicBool>,
87
88    /// Wakeup handles for each daemon (used to unblock their sleep on shutdown).
89    evictor_wake: WakeHandle,
90    cleaner_wake: WakeHandle,
91    checkpointer_wake: WakeHandle,
92
93    /// Evictor daemon thread handle.
94    evictor_handle: Option<thread::JoinHandle<()>>,
95
96    /// Cleaner daemon thread handle.
97    cleaner_handle: Option<thread::JoinHandle<()>>,
98
99    /// Checkpointer daemon thread handle.
100    checkpointer_handle: Option<thread::JoinHandle<()>>,
101
102    /// Whether evictor is enabled.
103    evictor_enabled: bool,
104
105    /// Whether cleaner is enabled.
106    cleaner_enabled: bool,
107
108    /// Whether checkpointer is enabled.
109    checkpointer_enabled: bool,
110
111    /// Evictor wakeup interval.
112    evictor_wakeup_ms: u64,
113
114    /// Cleaner wakeup interval.
115    cleaner_wakeup_ms: u64,
116
117    /// Checkpointer wakeup interval.
118    checkpointer_wakeup_ms: u64,
119}
120
121impl DaemonManager {
122    /// Creates a new DaemonManager from the given configuration.
123    ///
124    /// Daemons are not started until `start_daemons()` is called.
125    pub fn new(config: &EngineConfig) -> Self {
126        Self {
127            shutdown: Arc::new(AtomicBool::new(false)),
128            evictor_wake: WakeHandle::new(),
129            cleaner_wake: WakeHandle::new(),
130            checkpointer_wake: WakeHandle::new(),
131            evictor_handle: None,
132            cleaner_handle: None,
133            checkpointer_handle: None,
134            evictor_enabled: config.evictor_enabled,
135            cleaner_enabled: config.cleaner_enabled,
136            checkpointer_enabled: config.checkpointer_enabled,
137            evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
138            cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
139            checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
140        }
141    }
142
143    /// Starts all enabled daemon threads.
144    ///
145    /// Each daemon runs in a loop:
146    /// 1. Sleep for its wakeup interval
147    /// 2. Check shutdown flag
148    /// 3. Perform work (eviction, cleaning, checkpoint)
149    /// 4. Repeat
150    ///
151    /// # Arguments
152    /// * `evictor` - The evictor to use for eviction operations
153    /// * `cleaner` - The cleaner to use for cleaning operations
154    /// * `checkpointer` - The checkpointer to use for checkpoint operations
155    pub fn start_daemons(
156        &mut self,
157        evictor: Arc<Evictor>,
158        cleaner: Arc<Cleaner>,
159        checkpointer: Arc<Checkpointer>,
160    ) {
161        // Start evictor daemon
162        if self.evictor_enabled {
163            let shutdown = Arc::clone(&self.shutdown);
164            let wakeup_ms = self.evictor_wakeup_ms;
165            let evictor = Arc::clone(&evictor);
166            let wake = self.evictor_wake.clone();
167
168            self.evictor_handle = Some(thread::spawn(move || {
169                log::info!("Evictor daemon started");
170                while !shutdown.load(Ordering::Relaxed) {
171                    // Sleep for the wakeup interval, but return early on shutdown.
172                    let notified =
173                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
174                    if notified || shutdown.load(Ordering::Relaxed) {
175                        break;
176                    }
177
178                    // Perform eviction
179                    let result = evictor.do_evict(EvictionSource::Daemon);
180                    if result.nodes_evicted > 0 {
181                        log::debug!(
182                            "Evictor: evicted {} nodes, {} bytes",
183                            result.nodes_evicted,
184                            result.bytes_evicted
185                        );
186                    }
187                }
188                log::info!("Evictor daemon stopped");
189            }));
190        }
191
192        // Start cleaner daemon
193        if self.cleaner_enabled {
194            let shutdown = Arc::clone(&self.shutdown);
195            let wakeup_ms = self.cleaner_wakeup_ms;
196            let cleaner = Arc::clone(&cleaner);
197            let wake = self.cleaner_wake.clone();
198
199            self.cleaner_handle = Some(thread::spawn(move || {
200                log::info!("Cleaner daemon started");
201                while !shutdown.load(Ordering::Relaxed) {
202                    // Sleep for the wakeup interval, but return early on shutdown.
203                    let notified =
204                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
205                    if notified || shutdown.load(Ordering::Relaxed) {
206                        break;
207                    }
208
209                    // Perform cleaning
210                    match cleaner.do_clean(1, false) {
211                        Ok(result) => {
212                            if result.files_cleaned > 0 {
213                                log::debug!(
214                                    "Cleaner: cleaned {} files, deleted {} files",
215                                    result.files_cleaned,
216                                    result.files_deleted
217                                );
218                            }
219                        }
220                        Err(e) => {
221                            log::warn!("Cleaner error: {}", e);
222                        }
223                    }
224                }
225                log::info!("Cleaner daemon stopped");
226            }));
227        }
228
229        // Start checkpointer daemon
230        if self.checkpointer_enabled {
231            let shutdown = Arc::clone(&self.shutdown);
232            let wakeup_ms = self.checkpointer_wakeup_ms;
233            let checkpointer = Arc::clone(&checkpointer);
234            let wake = self.checkpointer_wake.clone();
235
236            self.checkpointer_handle = Some(thread::spawn(move || {
237                log::info!("Checkpointer daemon started");
238                while !shutdown.load(Ordering::Relaxed) {
239                    // Sleep for the wakeup interval, but return early on shutdown.
240                    let notified =
241                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
242                    if notified || shutdown.load(Ordering::Relaxed) {
243                        break;
244                    }
245
246                    // JE Checkpointer.isRunnable: skip the periodic checkpoint
247                    // on an idle environment (nothing written since the last
248                    // one) instead of writing a CheckpointEnd every wakeup.
249                    if !checkpointer.is_runnable(false) {
250                        continue;
251                    }
252                    // Perform checkpoint
253                    match checkpointer.do_checkpoint("daemon") {
254                        Ok(result) => {
255                            log::debug!(
256                                "Checkpoint: id={}, flushed {} nodes",
257                                result.checkpoint_id,
258                                result.total_nodes_flushed()
259                            );
260                        }
261                        Err(e) => {
262                            log::warn!("Checkpoint error: {}", e);
263                        }
264                    }
265                }
266                log::info!("Checkpointer daemon stopped");
267            }));
268        }
269    }
270
271    /// Signals shutdown and waits for all daemon threads to complete.
272    ///
273    /// Shutdown order mirrors JE `EnvironmentImpl.shutdownDaemons`:
274    ///   1. Set the shutdown flag and wake all sleeping daemons.
275    ///   2. Join the **cleaner** first — it can call the checkpointer
276    ///      internally, so it must stop before the checkpointer stops.
277    ///   3. Join the **checkpointer** — must stop before the evictor, because
278    ///      the final checkpoint must complete while the evictor is still able
279    ///      to flush dirty nodes that other daemons produce.
280    ///   4. Join the **evictor** last — it remains available to flush dirty
281    ///      nodes until all other daemons have exited.
282    ///
283    /// JE citation: `EnvironmentImpl.shutdownDaemons` comment:
284    ///   "Cleaner has to be shutdown before checkpointer because former
285    ///   calls the latter."
286    pub fn shutdown(&mut self) {
287        // Step 1: signal shutdown and wake all sleeping daemons immediately
288        // so they do not wait out their full sleep interval.
289        self.shutdown.store(true, Ordering::Relaxed);
290        self.cleaner_wake.notify();
291        self.checkpointer_wake.notify();
292        self.evictor_wake.notify();
293
294        // Step 2: join cleaner first (it may call checkpointer internally).
295        if let Some(handle) = self.cleaner_handle.take()
296            && let Err(e) = handle.join()
297        {
298            log::error!("Failed to join cleaner thread: {:?}", e);
299        }
300
301        // Step 3: join checkpointer after cleaner has stopped.
302        if let Some(handle) = self.checkpointer_handle.take()
303            && let Err(e) = handle.join()
304        {
305            log::error!("Failed to join checkpointer thread: {:?}", e);
306        }
307
308        // Step 4: join evictor last — it must remain available until
309        // the checkpoint completes so dirty nodes can be flushed.
310        if let Some(handle) = self.evictor_handle.take()
311            && let Err(e) = handle.join()
312        {
313            log::error!("Failed to join evictor thread: {:?}", e);
314        }
315    }
316
317    /// Returns `true` while this manager has not been shut down.
318    ///
319    /// Specifically, this returns `true` from construction until
320    /// [`shutdown`](Self::shutdown) is invoked. It does **not** prove that
321    /// any daemon thread is currently alive: a freshly-constructed manager
322    /// (before [`start_daemons`](Self::start_daemons) is called) reports
323    /// `true` here while [`running_count`](Self::running_count) returns 0.
324    ///
325    /// This semantic is codified by `test_daemon_manager_creation`, which
326    /// asserts both `is_running() == true` and `running_count() == 0`
327    /// before any daemons are started. Use `running_count()` if you need
328    /// the actual count of spawned daemon threads.
329    pub fn is_running(&self) -> bool {
330        // NB: name is historical. We return `!shutdown_requested` rather
331        // than checking the JoinHandles so that the post-`new`/pre-`start`
332        // contract above remains stable.
333        !self.shutdown.load(Ordering::Relaxed)
334    }
335
336    /// Returns the number of running daemons.
337    pub fn running_count(&self) -> usize {
338        let mut count = 0;
339        if self.evictor_enabled && self.evictor_handle.is_some() {
340            count += 1;
341        }
342        if self.cleaner_enabled && self.cleaner_handle.is_some() {
343            count += 1;
344        }
345        if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
346            count += 1;
347        }
348        count
349    }
350}
351
352impl Drop for DaemonManager {
353    fn drop(&mut self) {
354        // Ensure clean shutdown
355        if self.is_running() {
356            self.shutdown();
357        }
358    }
359}
360
361/// DST Milestone 2 (Phase 2a) hook: expose the internal sleep/notify handle so
362/// the shuttle test (`tests/shuttle_daemon_shutdown.rs`) can drive the real
363/// daemon-loop-vs-shutdown coordination through shuttle's scheduler.  Only
364/// compiled under `--cfg noxu_shuttle`; invisible to every other build.
365#[cfg(noxu_shuttle)]
366pub mod dst_hooks {
367    pub use super::WakeHandle;
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373    use noxu_evictor::Arbiter;
374    use noxu_recovery::CheckpointConfig;
375    use std::sync::atomic::AtomicI64;
376
377    #[test]
378    fn test_daemon_manager_creation() {
379        let config = EngineConfig::default();
380        let manager = DaemonManager::new(&config);
381
382        assert!(manager.evictor_enabled);
383        assert!(manager.cleaner_enabled);
384        assert!(manager.checkpointer_enabled);
385        assert!(manager.is_running());
386        assert_eq!(manager.running_count(), 0); // Not started yet
387    }
388
389    #[test]
390    fn test_daemon_manager_with_disabled_daemons() {
391        let config = EngineConfig::default()
392            .evictor_enabled(false)
393            .cleaner_enabled(false)
394            .checkpointer_enabled(false);
395        let manager = DaemonManager::new(&config);
396
397        assert!(!manager.evictor_enabled);
398        assert!(!manager.cleaner_enabled);
399        assert!(!manager.checkpointer_enabled);
400    }
401
402    #[test]
403    fn test_daemon_manager_start_and_shutdown() {
404        let config = EngineConfig::default()
405            .evictor_wakeup_interval_ms(100)
406            .cleaner_wakeup_interval_ms(100)
407            .checkpointer_wakeup_interval_ms(100);
408
409        let mut manager = DaemonManager::new(&config);
410
411        // Create subsystems
412        let usage = Arc::new(AtomicI64::new(500));
413        let arbiter = Arbiter::new(1000, usage, 100, 200);
414        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
415        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
416        let checkpointer =
417            Arc::new(Checkpointer::new(CheckpointConfig::default()));
418
419        // Start daemons
420        manager.start_daemons(evictor, cleaner, checkpointer);
421
422        // Give threads time to start
423        thread::sleep(Duration::from_millis(50));
424        assert!(manager.is_running());
425        assert_eq!(manager.running_count(), 3);
426
427        // Shutdown
428        manager.shutdown();
429        assert!(!manager.is_running());
430    }
431
432    #[test]
433    fn test_daemon_manager_selective_daemons() {
434        let config = EngineConfig::default()
435            .evictor_enabled(true)
436            .cleaner_enabled(false)
437            .checkpointer_enabled(true)
438            .evictor_wakeup_interval_ms(100)
439            .checkpointer_wakeup_interval_ms(100);
440
441        let mut manager = DaemonManager::new(&config);
442
443        let usage = Arc::new(AtomicI64::new(500));
444        let arbiter = Arbiter::new(1000, usage, 100, 200);
445        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
446        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
447        let checkpointer =
448            Arc::new(Checkpointer::new(CheckpointConfig::default()));
449
450        manager.start_daemons(evictor, cleaner, checkpointer);
451
452        thread::sleep(Duration::from_millis(50));
453        assert_eq!(manager.running_count(), 2); // Only evictor and checkpointer
454
455        manager.shutdown();
456    }
457
458    #[test]
459    fn test_daemon_manager_drop_cleanup() {
460        let config = EngineConfig::default()
461            .evictor_wakeup_interval_ms(100)
462            .cleaner_wakeup_interval_ms(100)
463            .checkpointer_wakeup_interval_ms(100);
464
465        let mut manager = DaemonManager::new(&config);
466
467        let usage = Arc::new(AtomicI64::new(500));
468        let arbiter = Arbiter::new(1000, usage, 100, 200);
469        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
470        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
471        let checkpointer =
472            Arc::new(Checkpointer::new(CheckpointConfig::default()));
473
474        manager.start_daemons(evictor, cleaner, checkpointer);
475
476        thread::sleep(Duration::from_millis(50));
477        assert!(manager.is_running());
478
479        // Drop should trigger cleanup
480        drop(manager);
481    }
482
483    #[test]
484    fn test_daemon_wakeup_intervals() {
485        let config = EngineConfig::default()
486            .evictor_wakeup_interval_ms(1000)
487            .cleaner_wakeup_interval_ms(2000)
488            .checkpointer_wakeup_interval_ms(3000);
489
490        let manager = DaemonManager::new(&config);
491        assert_eq!(manager.evictor_wakeup_ms, 1000);
492        assert_eq!(manager.cleaner_wakeup_ms, 2000);
493        assert_eq!(manager.checkpointer_wakeup_ms, 3000);
494    }
495
496    /// Verify that shutdown returns quickly even when daemons are configured
497    /// with a long wakeup interval.  If the condvar notification is working,
498    /// this completes in well under the 5-second interval.
499    #[test]
500    fn test_shutdown_wakes_daemons_early() {
501        use std::time::Instant;
502
503        // Use a 5-second interval; shutdown must complete far faster than that.
504        let config = EngineConfig::default()
505            .evictor_wakeup_interval_ms(5000)
506            .cleaner_wakeup_interval_ms(5000)
507            .checkpointer_wakeup_interval_ms(5000);
508
509        let mut manager = DaemonManager::new(&config);
510
511        let usage = Arc::new(AtomicI64::new(500));
512        let arbiter = Arbiter::new(1000, usage, 100, 200);
513        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
514        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
515        let checkpointer =
516            Arc::new(Checkpointer::new(CheckpointConfig::default()));
517
518        manager.start_daemons(evictor, cleaner, checkpointer);
519
520        // Give threads a moment to enter their wait.
521        thread::sleep(Duration::from_millis(50));
522
523        let start = Instant::now();
524        manager.shutdown();
525        let elapsed = start.elapsed();
526
527        // Shutdown must complete in under 1 second even though sleep is 5 s.
528        assert!(
529            elapsed < Duration::from_secs(1),
530            "shutdown took {:?}, expected < 1s",
531            elapsed
532        );
533    }
534
535    #[test]
536    fn test_wake_handle_timeout() {
537        let handle = WakeHandle::new();
538
539        // With no notification the wait should time out (returns false).
540        let notified = handle.wait_timeout(Duration::from_millis(50));
541        assert!(!notified);
542    }
543
544    #[test]
545    fn test_wake_handle_notify() {
546        use std::time::Instant;
547
548        let handle = WakeHandle::new();
549        let handle2 = handle.clone();
550
551        // Spawn a thread that notifies after a short delay.
552        let t = thread::spawn(move || {
553            thread::sleep(Duration::from_millis(20));
554            handle2.notify();
555        });
556
557        let start = Instant::now();
558        // Wait up to 5 seconds; notification should arrive ~20 ms in.
559        let notified = handle.wait_timeout(Duration::from_secs(5));
560        let elapsed = start.elapsed();
561
562        t.join().unwrap();
563
564        assert!(notified, "expected notify to return true");
565        assert!(
566            elapsed < Duration::from_millis(500),
567            "took {:?}, expected wakeup within 500ms",
568            elapsed
569        );
570    }
571
572    // -----------------------------------------------------------------------
573    // CC-3: JE-correct shutdown order (cleaner → checkpointer → evictor)
574    // -----------------------------------------------------------------------
575
576    /// Verifies that the daemons stop in the JE-mandated order:
577    ///   cleaner → checkpointer → evictor.
578    ///
579    /// We instrument DaemonManager's join sequence by using threads that
580    /// block each other: cleaner exits immediately, checkpointer waits for
581    /// the cleaner to be joined, evictor waits for the checkpointer to be
582    /// joined.  If the join order were wrong the test would deadlock (and
583    /// the bounded-time assertion would fire).
584    ///
585    /// Separately we capture the join-completion order from the calling
586    /// thread via a shared sequence counter.
587    ///
588    /// JE reference: `EnvironmentImpl.shutdownDaemons` — "Cleaner has to be
589    /// shutdown before checkpointer because former calls the latter."
590    #[test]
591    fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
592        use std::sync::Mutex;
593        use std::time::Instant;
594
595        // Each daemon thread records a monotone join-sequence number.
596        // The thread blocks until the *previous* daemon in the correct order
597        // has already been joined — this makes a wrong join order deadlock.
598        let join_seq: Arc<Mutex<Vec<&'static str>>> =
599            Arc::new(Mutex::new(Vec::new()));
600
601        let shutdown_flag = Arc::new(AtomicBool::new(false));
602
603        // Barrier pairs: cleaner releases checkpointer; checkpointer releases evictor.
604        let cleaner_joined =
605            Arc::new((Mutex::new(false), std::sync::Condvar::new()));
606        let checkpointer_joined =
607            Arc::new((Mutex::new(false), std::sync::Condvar::new()));
608
609        let wake_c = WakeHandle::new();
610        let wake_cp = WakeHandle::new();
611        let wake_ev = WakeHandle::new();
612
613        // Cleaner: exits immediately after shutdown signal.
614        let sd_c = shutdown_flag.clone();
615        let wake_c2 = wake_c.clone();
616        let cleaner_t = thread::spawn(move || {
617            while !sd_c.load(Ordering::Relaxed) {
618                wake_c2.wait_timeout(Duration::from_millis(5000));
619            }
620            // No blocking — exits right away so join_cleaner completes first.
621        });
622
623        // Checkpointer: waits until cleaner has been joined, then exits.
624        let sd_cp = shutdown_flag.clone();
625        let wake_cp2 = wake_cp.clone();
626        let cj = cleaner_joined.clone();
627        let checkpointer_t = thread::spawn(move || {
628            while !sd_cp.load(Ordering::Relaxed) {
629                wake_cp2.wait_timeout(Duration::from_millis(5000));
630            }
631            // Block until the calling thread has joined the cleaner.
632            let (lock, cv) = &*cj;
633            let mut g = lock.lock().unwrap();
634            while !*g {
635                g = cv.wait(g).unwrap();
636            }
637        });
638
639        // Evictor: waits until checkpointer has been joined, then exits.
640        let sd_ev = shutdown_flag.clone();
641        let wake_ev2 = wake_ev.clone();
642        let cpj = checkpointer_joined.clone();
643        let evictor_t = thread::spawn(move || {
644            while !sd_ev.load(Ordering::Relaxed) {
645                wake_ev2.wait_timeout(Duration::from_millis(5000));
646            }
647            let (lock, cv) = &*cpj;
648            let mut g = lock.lock().unwrap();
649            while !*g {
650                g = cv.wait(g).unwrap();
651            }
652        });
653
654        // Simulate shutdown: signal + wake.
655        shutdown_flag.store(true, Ordering::Relaxed);
656        wake_c.notify();
657        wake_cp.notify();
658        wake_ev.notify();
659
660        let start = Instant::now();
661
662        // Join cleaner first.
663        cleaner_t.join().unwrap();
664        join_seq.lock().unwrap().push("cleaner");
665        {
666            let (l, cv) = &*cleaner_joined;
667            *l.lock().unwrap() = true;
668            cv.notify_all();
669        }
670
671        // Join checkpointer second.
672        checkpointer_t.join().unwrap();
673        join_seq.lock().unwrap().push("checkpointer");
674        {
675            let (l, cv) = &*checkpointer_joined;
676            *l.lock().unwrap() = true;
677            cv.notify_all();
678        }
679
680        // Join evictor last.
681        evictor_t.join().unwrap();
682        join_seq.lock().unwrap().push("evictor");
683
684        let elapsed = start.elapsed();
685        assert!(
686            elapsed < Duration::from_secs(2),
687            "CC-3: shutdown stalled: {:?}",
688            elapsed
689        );
690
691        let order = join_seq.lock().unwrap();
692        assert_eq!(
693            *order,
694            vec!["cleaner", "checkpointer", "evictor"],
695            "CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
696        );
697    }
698
699    /// Shutdown must complete within a bounded time even with long wakeup
700    /// intervals — and must NOT deadlock (the join sequence must not block
701    /// a later join waiting on an earlier one).
702    #[test]
703    fn test_cc3_shutdown_no_deadlock_bounded_time() {
704        use std::time::Instant;
705
706        // Very long intervals; shutdown must complete fast via condvar.
707        let config = EngineConfig::default()
708            .evictor_wakeup_interval_ms(10_000)
709            .cleaner_wakeup_interval_ms(10_000)
710            .checkpointer_wakeup_interval_ms(10_000);
711
712        let mut manager = DaemonManager::new(&config);
713
714        let usage = Arc::new(AtomicI64::new(500));
715        let arbiter = Arbiter::new(1000, usage, 100, 200);
716        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
717        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
718        let checkpointer =
719            Arc::new(Checkpointer::new(CheckpointConfig::default()));
720
721        manager.start_daemons(evictor, cleaner, checkpointer);
722        thread::sleep(Duration::from_millis(30));
723
724        let start = Instant::now();
725        manager.shutdown();
726        let elapsed = start.elapsed();
727
728        assert!(
729            elapsed < Duration::from_secs(2),
730            "CC-3: shutdown deadlocked or stalled: took {:?}",
731            elapsed
732        );
733        assert!(!manager.is_running());
734    }
735}