noxu-engine 7.1.0

Engine orchestration for Noxu DB
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
//! Background daemon lifecycle management.
//!
//! # DST M1.1 note: no injectable Clock is threaded here (deliberate)
//!
//! The daemon wakeup interval is a *config-supplied* `Duration`
//! (`evictor_wakeup_ms` / `cleaner_wakeup_ms` / `checkpointer_wakeup_ms`),
//! passed straight to [`WakeHandle::wait_timeout`]; there is no hardcoded
//! `std::time` control-flow site to virtualise.  The shutdown path's liveness
//! is **notify-driven** (`shutdown()` sets the flag and calls
//! `WakeHandle::notify`), not timeout-driven — which is exactly why the
//! Milestone-2 shuttle gate (`tests/shuttle_daemon_shutdown.rs`) proves it
//! deadlock-free without any clock injection.  A `SimClock` would add nothing
//! the config `Duration` + the notify seam do not already give, so the Clock
//! is intentionally NOT threaded through the daemon loops (matches the M1
//! deferral rationale).

use crate::engine_config::EngineConfig;
use noxu_cleaner::Cleaner;
use noxu_evictor::{EvictionSource, Evictor};
use noxu_recovery::Checkpointer;
use noxu_util::dst_sync::atomic::{AtomicBool, Ordering};
use noxu_util::dst_sync::{Arc, Condvar, Mutex, thread};
use std::time::Duration;

/// A wakeup handle used by daemon threads to sleep with early-exit on shutdown.
///
/// Each daemon receives a clone of this handle. When `notify()` is called
/// (at shutdown), the daemon wakes from its sleep immediately rather than
/// waiting for the full interval to elapse.
///
/// `pub(crate)` so the DST Milestone 2 shuttle test
/// (`tests/shuttle_daemon_shutdown.rs`, gated behind `--cfg noxu_shuttle`) can
/// drive the real sleep/notify coordination through shuttle's scheduler.
#[derive(Clone)]
#[doc(hidden)]
pub struct WakeHandle {
    pair: Arc<(Mutex<bool>, Condvar)>,
}

impl WakeHandle {
    pub(crate) fn new() -> Self {
        Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
    }

    /// Sleep for `duration`, but return early if `notify()` is called.
    ///
    /// Returns `true` if the wakeup was triggered by a shutdown notification,
    /// `false` if the timeout elapsed normally.
    ///
    /// The notify flag is checked *before* blocking on the condvar: a
    /// `notify()` that lands between the caller's previous loop iteration and
    /// this call sets the flag under the mutex, and `notify_all` on a condvar
    /// with no waiter is a no-op.  Without the pre-check the daemon would block
    /// for the full `duration` despite an already-pending notify (a lost
    /// wakeup) — in production merely a shutdown *stall* up to `duration`, but a
    /// hang under the DST shuttle scheduler, whose `wait_timeout` never times
    /// out.  The pre-check closes that window.  (Surfaced by the Milestone-2
    /// shuttle gate, `tests/shuttle_daemon_shutdown.rs`.)
    #[doc(hidden)]
    pub fn wait_timeout(&self, duration: Duration) -> bool {
        let (lock, cvar) = &*self.pair;
        let guard = lock.lock().unwrap();
        if *guard {
            return true;
        }
        let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
        *guard
    }

    /// Notify the sleeping daemon to wake up immediately.
    #[doc(hidden)]
    pub fn notify(&self) {
        let (lock, cvar) = &*self.pair;
        *lock.lock().unwrap() = true;
        cvar.notify_all();
    }
}

#[cfg(noxu_shuttle)]
impl WakeHandle {
    /// Shuttle-test constructor (mirrors [`WakeHandle::new`]).
    #[doc(hidden)]
    pub fn new_for_shuttle() -> Self {
        Self::new()
    }
}

/// Manages the lifecycle of background daemon threads.
///
/// The DaemonManager is responsible for:
/// - Starting daemon threads (evictor, cleaner, checkpointer)
/// - Coordinating shutdown of all daemons
/// - Tracking daemon running state
///
/// Each daemon runs in its own thread, periodically waking up to perform work.
/// On shutdown, daemons are notified via a Condvar so they exit immediately
/// instead of sleeping through their full wakeup interval.
pub struct DaemonManager {
    /// Shutdown signal shared by all daemon threads.
    shutdown: Arc<AtomicBool>,

    /// Wakeup handles for each daemon (used to unblock their sleep on shutdown).
    evictor_wake: WakeHandle,
    cleaner_wake: WakeHandle,
    checkpointer_wake: WakeHandle,

    /// Evictor daemon thread handle.
    evictor_handle: Option<thread::JoinHandle<()>>,

    /// Cleaner daemon thread handle.
    cleaner_handle: Option<thread::JoinHandle<()>>,

    /// Checkpointer daemon thread handle.
    checkpointer_handle: Option<thread::JoinHandle<()>>,

    /// Whether evictor is enabled.
    evictor_enabled: bool,

    /// Whether cleaner is enabled.
    cleaner_enabled: bool,

    /// Whether checkpointer is enabled.
    checkpointer_enabled: bool,

    /// Evictor wakeup interval.
    evictor_wakeup_ms: u64,

    /// Cleaner wakeup interval.
    cleaner_wakeup_ms: u64,

    /// Checkpointer wakeup interval.
    checkpointer_wakeup_ms: u64,
}

impl DaemonManager {
    /// Creates a new DaemonManager from the given configuration.
    ///
    /// Daemons are not started until `start_daemons()` is called.
    pub fn new(config: &EngineConfig) -> Self {
        Self {
            shutdown: Arc::new(AtomicBool::new(false)),
            evictor_wake: WakeHandle::new(),
            cleaner_wake: WakeHandle::new(),
            checkpointer_wake: WakeHandle::new(),
            evictor_handle: None,
            cleaner_handle: None,
            checkpointer_handle: None,
            evictor_enabled: config.evictor_enabled,
            cleaner_enabled: config.cleaner_enabled,
            checkpointer_enabled: config.checkpointer_enabled,
            evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
            cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
            checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
        }
    }

    /// Starts all enabled daemon threads.
    ///
    /// Each daemon runs in a loop:
    /// 1. Sleep for its wakeup interval
    /// 2. Check shutdown flag
    /// 3. Perform work (eviction, cleaning, checkpoint)
    /// 4. Repeat
    ///
    /// # Arguments
    /// * `evictor` - The evictor to use for eviction operations
    /// * `cleaner` - The cleaner to use for cleaning operations
    /// * `checkpointer` - The checkpointer to use for checkpoint operations
    pub fn start_daemons(
        &mut self,
        evictor: Arc<Evictor>,
        cleaner: Arc<Cleaner>,
        checkpointer: Arc<Checkpointer>,
    ) {
        // Start evictor daemon
        if self.evictor_enabled {
            let shutdown = Arc::clone(&self.shutdown);
            let wakeup_ms = self.evictor_wakeup_ms;
            let evictor = Arc::clone(&evictor);
            let wake = self.evictor_wake.clone();

            self.evictor_handle = Some(thread::spawn(move || {
                log::info!("Evictor daemon started");
                while !shutdown.load(Ordering::Relaxed) {
                    // Sleep for the wakeup interval, but return early on shutdown.
                    let notified =
                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
                    if notified || shutdown.load(Ordering::Relaxed) {
                        break;
                    }

                    // Perform eviction
                    let result = evictor.do_evict(EvictionSource::Daemon);
                    if result.nodes_evicted > 0 {
                        log::debug!(
                            "Evictor: evicted {} nodes, {} bytes",
                            result.nodes_evicted,
                            result.bytes_evicted
                        );
                    }
                }
                log::info!("Evictor daemon stopped");
            }));
        }

        // Start cleaner daemon
        if self.cleaner_enabled {
            let shutdown = Arc::clone(&self.shutdown);
            let wakeup_ms = self.cleaner_wakeup_ms;
            let cleaner = Arc::clone(&cleaner);
            let wake = self.cleaner_wake.clone();

            self.cleaner_handle = Some(thread::spawn(move || {
                log::info!("Cleaner daemon started");
                while !shutdown.load(Ordering::Relaxed) {
                    // Sleep for the wakeup interval, but return early on shutdown.
                    let notified =
                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
                    if notified || shutdown.load(Ordering::Relaxed) {
                        break;
                    }

                    // Perform cleaning
                    match cleaner.do_clean(1, false) {
                        Ok(result) => {
                            if result.files_cleaned > 0 {
                                log::debug!(
                                    "Cleaner: cleaned {} files, deleted {} files",
                                    result.files_cleaned,
                                    result.files_deleted
                                );
                            }
                        }
                        Err(e) => {
                            log::warn!("Cleaner error: {}", e);
                        }
                    }
                }
                log::info!("Cleaner daemon stopped");
            }));
        }

        // Start checkpointer daemon
        if self.checkpointer_enabled {
            let shutdown = Arc::clone(&self.shutdown);
            let wakeup_ms = self.checkpointer_wakeup_ms;
            let checkpointer = Arc::clone(&checkpointer);
            let wake = self.checkpointer_wake.clone();

            self.checkpointer_handle = Some(thread::spawn(move || {
                log::info!("Checkpointer daemon started");
                while !shutdown.load(Ordering::Relaxed) {
                    // Sleep for the wakeup interval, but return early on shutdown.
                    let notified =
                        wake.wait_timeout(Duration::from_millis(wakeup_ms));
                    if notified || shutdown.load(Ordering::Relaxed) {
                        break;
                    }

                    // JE Checkpointer.isRunnable: skip the periodic checkpoint
                    // on an idle environment (nothing written since the last
                    // one) instead of writing a CheckpointEnd every wakeup.
                    if !checkpointer.is_runnable(false) {
                        continue;
                    }
                    // Perform checkpoint
                    match checkpointer.do_checkpoint("daemon") {
                        Ok(result) => {
                            log::debug!(
                                "Checkpoint: id={}, flushed {} nodes",
                                result.checkpoint_id,
                                result.total_nodes_flushed()
                            );
                        }
                        Err(e) => {
                            log::warn!("Checkpoint error: {}", e);
                        }
                    }
                }
                log::info!("Checkpointer daemon stopped");
            }));
        }
    }

    /// Signals shutdown and waits for all daemon threads to complete.
    ///
    /// Shutdown order mirrors JE `EnvironmentImpl.shutdownDaemons`:
    ///   1. Set the shutdown flag and wake all sleeping daemons.
    ///   2. Join the **cleaner** first — it can call the checkpointer
    ///      internally, so it must stop before the checkpointer stops.
    ///   3. Join the **checkpointer** — must stop before the evictor, because
    ///      the final checkpoint must complete while the evictor is still able
    ///      to flush dirty nodes that other daemons produce.
    ///   4. Join the **evictor** last — it remains available to flush dirty
    ///      nodes until all other daemons have exited.
    ///
    /// JE citation: `EnvironmentImpl.shutdownDaemons` comment:
    ///   "Cleaner has to be shutdown before checkpointer because former
    ///   calls the latter."
    pub fn shutdown(&mut self) {
        // Step 1: signal shutdown and wake all sleeping daemons immediately
        // so they do not wait out their full sleep interval.
        self.shutdown.store(true, Ordering::Relaxed);
        self.cleaner_wake.notify();
        self.checkpointer_wake.notify();
        self.evictor_wake.notify();

        // Step 2: join cleaner first (it may call checkpointer internally).
        if let Some(handle) = self.cleaner_handle.take()
            && let Err(e) = handle.join()
        {
            log::error!("Failed to join cleaner thread: {:?}", e);
        }

        // Step 3: join checkpointer after cleaner has stopped.
        if let Some(handle) = self.checkpointer_handle.take()
            && let Err(e) = handle.join()
        {
            log::error!("Failed to join checkpointer thread: {:?}", e);
        }

        // Step 4: join evictor last — it must remain available until
        // the checkpoint completes so dirty nodes can be flushed.
        if let Some(handle) = self.evictor_handle.take()
            && let Err(e) = handle.join()
        {
            log::error!("Failed to join evictor thread: {:?}", e);
        }
    }

    /// Returns `true` while this manager has not been shut down.
    ///
    /// Specifically, this returns `true` from construction until
    /// [`shutdown`](Self::shutdown) is invoked. It does **not** prove that
    /// any daemon thread is currently alive: a freshly-constructed manager
    /// (before [`start_daemons`](Self::start_daemons) is called) reports
    /// `true` here while [`running_count`](Self::running_count) returns 0.
    ///
    /// This semantic is codified by `test_daemon_manager_creation`, which
    /// asserts both `is_running() == true` and `running_count() == 0`
    /// before any daemons are started. Use `running_count()` if you need
    /// the actual count of spawned daemon threads.
    pub fn is_running(&self) -> bool {
        // NB: name is historical. We return `!shutdown_requested` rather
        // than checking the JoinHandles so that the post-`new`/pre-`start`
        // contract above remains stable.
        !self.shutdown.load(Ordering::Relaxed)
    }

    /// Returns the number of running daemons.
    pub fn running_count(&self) -> usize {
        let mut count = 0;
        if self.evictor_enabled && self.evictor_handle.is_some() {
            count += 1;
        }
        if self.cleaner_enabled && self.cleaner_handle.is_some() {
            count += 1;
        }
        if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
            count += 1;
        }
        count
    }
}

impl Drop for DaemonManager {
    fn drop(&mut self) {
        // Ensure clean shutdown
        if self.is_running() {
            self.shutdown();
        }
    }
}

/// DST Milestone 2 (Phase 2a) hook: expose the internal sleep/notify handle so
/// the shuttle test (`tests/shuttle_daemon_shutdown.rs`) can drive the real
/// daemon-loop-vs-shutdown coordination through shuttle's scheduler.  Only
/// compiled under `--cfg noxu_shuttle`; invisible to every other build.
#[cfg(noxu_shuttle)]
pub mod dst_hooks {
    pub use super::WakeHandle;
}

#[cfg(test)]
mod tests {
    use super::*;
    use noxu_evictor::Arbiter;
    use noxu_recovery::CheckpointConfig;
    use std::sync::atomic::AtomicI64;

    #[test]
    fn test_daemon_manager_creation() {
        let config = EngineConfig::default();
        let manager = DaemonManager::new(&config);

        assert!(manager.evictor_enabled);
        assert!(manager.cleaner_enabled);
        assert!(manager.checkpointer_enabled);
        assert!(manager.is_running());
        assert_eq!(manager.running_count(), 0); // Not started yet
    }

    #[test]
    fn test_daemon_manager_with_disabled_daemons() {
        let config = EngineConfig::default()
            .evictor_enabled(false)
            .cleaner_enabled(false)
            .checkpointer_enabled(false);
        let manager = DaemonManager::new(&config);

        assert!(!manager.evictor_enabled);
        assert!(!manager.cleaner_enabled);
        assert!(!manager.checkpointer_enabled);
    }

    #[test]
    fn test_daemon_manager_start_and_shutdown() {
        let config = EngineConfig::default()
            .evictor_wakeup_interval_ms(100)
            .cleaner_wakeup_interval_ms(100)
            .checkpointer_wakeup_interval_ms(100);

        let mut manager = DaemonManager::new(&config);

        // Create subsystems
        let usage = Arc::new(AtomicI64::new(500));
        let arbiter = Arbiter::new(1000, usage, 100, 200);
        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
        let checkpointer =
            Arc::new(Checkpointer::new(CheckpointConfig::default()));

        // Start daemons
        manager.start_daemons(evictor, cleaner, checkpointer);

        // Give threads time to start
        thread::sleep(Duration::from_millis(50));
        assert!(manager.is_running());
        assert_eq!(manager.running_count(), 3);

        // Shutdown
        manager.shutdown();
        assert!(!manager.is_running());
    }

    #[test]
    fn test_daemon_manager_selective_daemons() {
        let config = EngineConfig::default()
            .evictor_enabled(true)
            .cleaner_enabled(false)
            .checkpointer_enabled(true)
            .evictor_wakeup_interval_ms(100)
            .checkpointer_wakeup_interval_ms(100);

        let mut manager = DaemonManager::new(&config);

        let usage = Arc::new(AtomicI64::new(500));
        let arbiter = Arbiter::new(1000, usage, 100, 200);
        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
        let checkpointer =
            Arc::new(Checkpointer::new(CheckpointConfig::default()));

        manager.start_daemons(evictor, cleaner, checkpointer);

        thread::sleep(Duration::from_millis(50));
        assert_eq!(manager.running_count(), 2); // Only evictor and checkpointer

        manager.shutdown();
    }

    #[test]
    fn test_daemon_manager_drop_cleanup() {
        let config = EngineConfig::default()
            .evictor_wakeup_interval_ms(100)
            .cleaner_wakeup_interval_ms(100)
            .checkpointer_wakeup_interval_ms(100);

        let mut manager = DaemonManager::new(&config);

        let usage = Arc::new(AtomicI64::new(500));
        let arbiter = Arbiter::new(1000, usage, 100, 200);
        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
        let checkpointer =
            Arc::new(Checkpointer::new(CheckpointConfig::default()));

        manager.start_daemons(evictor, cleaner, checkpointer);

        thread::sleep(Duration::from_millis(50));
        assert!(manager.is_running());

        // Drop should trigger cleanup
        drop(manager);
    }

    #[test]
    fn test_daemon_wakeup_intervals() {
        let config = EngineConfig::default()
            .evictor_wakeup_interval_ms(1000)
            .cleaner_wakeup_interval_ms(2000)
            .checkpointer_wakeup_interval_ms(3000);

        let manager = DaemonManager::new(&config);
        assert_eq!(manager.evictor_wakeup_ms, 1000);
        assert_eq!(manager.cleaner_wakeup_ms, 2000);
        assert_eq!(manager.checkpointer_wakeup_ms, 3000);
    }

    /// Verify that shutdown returns quickly even when daemons are configured
    /// with a long wakeup interval.  If the condvar notification is working,
    /// this completes in well under the 5-second interval.
    #[test]
    fn test_shutdown_wakes_daemons_early() {
        use std::time::Instant;

        // Use a 5-second interval; shutdown must complete far faster than that.
        let config = EngineConfig::default()
            .evictor_wakeup_interval_ms(5000)
            .cleaner_wakeup_interval_ms(5000)
            .checkpointer_wakeup_interval_ms(5000);

        let mut manager = DaemonManager::new(&config);

        let usage = Arc::new(AtomicI64::new(500));
        let arbiter = Arbiter::new(1000, usage, 100, 200);
        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
        let checkpointer =
            Arc::new(Checkpointer::new(CheckpointConfig::default()));

        manager.start_daemons(evictor, cleaner, checkpointer);

        // Give threads a moment to enter their wait.
        thread::sleep(Duration::from_millis(50));

        let start = Instant::now();
        manager.shutdown();
        let elapsed = start.elapsed();

        // Shutdown must complete in under 1 second even though sleep is 5 s.
        assert!(
            elapsed < Duration::from_secs(1),
            "shutdown took {:?}, expected < 1s",
            elapsed
        );
    }

    #[test]
    fn test_wake_handle_timeout() {
        let handle = WakeHandle::new();

        // With no notification the wait should time out (returns false).
        let notified = handle.wait_timeout(Duration::from_millis(50));
        assert!(!notified);
    }

    #[test]
    fn test_wake_handle_notify() {
        use std::time::Instant;

        let handle = WakeHandle::new();
        let handle2 = handle.clone();

        // Spawn a thread that notifies after a short delay.
        let t = thread::spawn(move || {
            thread::sleep(Duration::from_millis(20));
            handle2.notify();
        });

        let start = Instant::now();
        // Wait up to 5 seconds; notification should arrive ~20 ms in.
        let notified = handle.wait_timeout(Duration::from_secs(5));
        let elapsed = start.elapsed();

        t.join().unwrap();

        assert!(notified, "expected notify to return true");
        assert!(
            elapsed < Duration::from_millis(500),
            "took {:?}, expected wakeup within 500ms",
            elapsed
        );
    }

    // -----------------------------------------------------------------------
    // CC-3: JE-correct shutdown order (cleaner → checkpointer → evictor)
    // -----------------------------------------------------------------------

    /// Verifies that the daemons stop in the JE-mandated order:
    ///   cleaner → checkpointer → evictor.
    ///
    /// We instrument DaemonManager's join sequence by using threads that
    /// block each other: cleaner exits immediately, checkpointer waits for
    /// the cleaner to be joined, evictor waits for the checkpointer to be
    /// joined.  If the join order were wrong the test would deadlock (and
    /// the bounded-time assertion would fire).
    ///
    /// Separately we capture the join-completion order from the calling
    /// thread via a shared sequence counter.
    ///
    /// JE reference: `EnvironmentImpl.shutdownDaemons` — "Cleaner has to be
    /// shutdown before checkpointer because former calls the latter."
    #[test]
    fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
        use std::sync::Mutex;
        use std::time::Instant;

        // Each daemon thread records a monotone join-sequence number.
        // The thread blocks until the *previous* daemon in the correct order
        // has already been joined — this makes a wrong join order deadlock.
        let join_seq: Arc<Mutex<Vec<&'static str>>> =
            Arc::new(Mutex::new(Vec::new()));

        let shutdown_flag = Arc::new(AtomicBool::new(false));

        // Barrier pairs: cleaner releases checkpointer; checkpointer releases evictor.
        let cleaner_joined =
            Arc::new((Mutex::new(false), std::sync::Condvar::new()));
        let checkpointer_joined =
            Arc::new((Mutex::new(false), std::sync::Condvar::new()));

        let wake_c = WakeHandle::new();
        let wake_cp = WakeHandle::new();
        let wake_ev = WakeHandle::new();

        // Cleaner: exits immediately after shutdown signal.
        let sd_c = shutdown_flag.clone();
        let wake_c2 = wake_c.clone();
        let cleaner_t = thread::spawn(move || {
            while !sd_c.load(Ordering::Relaxed) {
                wake_c2.wait_timeout(Duration::from_millis(5000));
            }
            // No blocking — exits right away so join_cleaner completes first.
        });

        // Checkpointer: waits until cleaner has been joined, then exits.
        let sd_cp = shutdown_flag.clone();
        let wake_cp2 = wake_cp.clone();
        let cj = cleaner_joined.clone();
        let checkpointer_t = thread::spawn(move || {
            while !sd_cp.load(Ordering::Relaxed) {
                wake_cp2.wait_timeout(Duration::from_millis(5000));
            }
            // Block until the calling thread has joined the cleaner.
            let (lock, cv) = &*cj;
            let mut g = lock.lock().unwrap();
            while !*g {
                g = cv.wait(g).unwrap();
            }
        });

        // Evictor: waits until checkpointer has been joined, then exits.
        let sd_ev = shutdown_flag.clone();
        let wake_ev2 = wake_ev.clone();
        let cpj = checkpointer_joined.clone();
        let evictor_t = thread::spawn(move || {
            while !sd_ev.load(Ordering::Relaxed) {
                wake_ev2.wait_timeout(Duration::from_millis(5000));
            }
            let (lock, cv) = &*cpj;
            let mut g = lock.lock().unwrap();
            while !*g {
                g = cv.wait(g).unwrap();
            }
        });

        // Simulate shutdown: signal + wake.
        shutdown_flag.store(true, Ordering::Relaxed);
        wake_c.notify();
        wake_cp.notify();
        wake_ev.notify();

        let start = Instant::now();

        // Join cleaner first.
        cleaner_t.join().unwrap();
        join_seq.lock().unwrap().push("cleaner");
        {
            let (l, cv) = &*cleaner_joined;
            *l.lock().unwrap() = true;
            cv.notify_all();
        }

        // Join checkpointer second.
        checkpointer_t.join().unwrap();
        join_seq.lock().unwrap().push("checkpointer");
        {
            let (l, cv) = &*checkpointer_joined;
            *l.lock().unwrap() = true;
            cv.notify_all();
        }

        // Join evictor last.
        evictor_t.join().unwrap();
        join_seq.lock().unwrap().push("evictor");

        let elapsed = start.elapsed();
        assert!(
            elapsed < Duration::from_secs(2),
            "CC-3: shutdown stalled: {:?}",
            elapsed
        );

        let order = join_seq.lock().unwrap();
        assert_eq!(
            *order,
            vec!["cleaner", "checkpointer", "evictor"],
            "CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
        );
    }

    /// Shutdown must complete within a bounded time even with long wakeup
    /// intervals — and must NOT deadlock (the join sequence must not block
    /// a later join waiting on an earlier one).
    #[test]
    fn test_cc3_shutdown_no_deadlock_bounded_time() {
        use std::time::Instant;

        // Very long intervals; shutdown must complete fast via condvar.
        let config = EngineConfig::default()
            .evictor_wakeup_interval_ms(10_000)
            .cleaner_wakeup_interval_ms(10_000)
            .checkpointer_wakeup_interval_ms(10_000);

        let mut manager = DaemonManager::new(&config);

        let usage = Arc::new(AtomicI64::new(500));
        let arbiter = Arbiter::new(1000, usage, 100, 200);
        let evictor = Arc::new(Evictor::new(arbiter, 100, false));
        let cleaner = Arc::new(Cleaner::new(50, 5, 0));
        let checkpointer =
            Arc::new(Checkpointer::new(CheckpointConfig::default()));

        manager.start_daemons(evictor, cleaner, checkpointer);
        thread::sleep(Duration::from_millis(30));

        let start = Instant::now();
        manager.shutdown();
        let elapsed = start.elapsed();

        assert!(
            elapsed < Duration::from_secs(2),
            "CC-3: shutdown deadlocked or stalled: took {:?}",
            elapsed
        );
        assert!(!manager.is_running());
    }
}