1use crate::engine_config::EngineConfig;
4use noxu_cleaner::Cleaner;
5use noxu_evictor::{EvictionSource, Evictor};
6use noxu_recovery::Checkpointer;
7use noxu_util::dst_sync::atomic::{AtomicBool, Ordering};
8use noxu_util::dst_sync::{Arc, Condvar, Mutex, thread};
9use std::time::Duration;
10
11#[derive(Clone)]
21#[doc(hidden)]
22pub struct WakeHandle {
23 pair: Arc<(Mutex<bool>, Condvar)>,
24}
25
26impl WakeHandle {
27 pub(crate) fn new() -> Self {
28 Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
29 }
30
31 #[doc(hidden)]
46 pub fn wait_timeout(&self, duration: Duration) -> bool {
47 let (lock, cvar) = &*self.pair;
48 let guard = lock.lock().unwrap();
49 if *guard {
50 return true;
51 }
52 let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
53 *guard
54 }
55
56 #[doc(hidden)]
58 pub fn notify(&self) {
59 let (lock, cvar) = &*self.pair;
60 *lock.lock().unwrap() = true;
61 cvar.notify_all();
62 }
63}
64
65#[cfg(noxu_shuttle)]
66impl WakeHandle {
67 #[doc(hidden)]
69 pub fn new_for_shuttle() -> Self {
70 Self::new()
71 }
72}
73
74pub struct DaemonManager {
85 shutdown: Arc<AtomicBool>,
87
88 evictor_wake: WakeHandle,
90 cleaner_wake: WakeHandle,
91 checkpointer_wake: WakeHandle,
92
93 evictor_handle: Option<thread::JoinHandle<()>>,
95
96 cleaner_handle: Option<thread::JoinHandle<()>>,
98
99 checkpointer_handle: Option<thread::JoinHandle<()>>,
101
102 evictor_enabled: bool,
104
105 cleaner_enabled: bool,
107
108 checkpointer_enabled: bool,
110
111 evictor_wakeup_ms: u64,
113
114 cleaner_wakeup_ms: u64,
116
117 checkpointer_wakeup_ms: u64,
119}
120
121impl DaemonManager {
122 pub fn new(config: &EngineConfig) -> Self {
126 Self {
127 shutdown: Arc::new(AtomicBool::new(false)),
128 evictor_wake: WakeHandle::new(),
129 cleaner_wake: WakeHandle::new(),
130 checkpointer_wake: WakeHandle::new(),
131 evictor_handle: None,
132 cleaner_handle: None,
133 checkpointer_handle: None,
134 evictor_enabled: config.evictor_enabled,
135 cleaner_enabled: config.cleaner_enabled,
136 checkpointer_enabled: config.checkpointer_enabled,
137 evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
138 cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
139 checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
140 }
141 }
142
143 pub fn start_daemons(
156 &mut self,
157 evictor: Arc<Evictor>,
158 cleaner: Arc<Cleaner>,
159 checkpointer: Arc<Checkpointer>,
160 ) {
161 if self.evictor_enabled {
163 let shutdown = Arc::clone(&self.shutdown);
164 let wakeup_ms = self.evictor_wakeup_ms;
165 let evictor = Arc::clone(&evictor);
166 let wake = self.evictor_wake.clone();
167
168 self.evictor_handle = Some(thread::spawn(move || {
169 log::info!("Evictor daemon started");
170 while !shutdown.load(Ordering::Relaxed) {
171 let notified =
173 wake.wait_timeout(Duration::from_millis(wakeup_ms));
174 if notified || shutdown.load(Ordering::Relaxed) {
175 break;
176 }
177
178 let result = evictor.do_evict(EvictionSource::Daemon);
180 if result.nodes_evicted > 0 {
181 log::debug!(
182 "Evictor: evicted {} nodes, {} bytes",
183 result.nodes_evicted,
184 result.bytes_evicted
185 );
186 }
187 }
188 log::info!("Evictor daemon stopped");
189 }));
190 }
191
192 if self.cleaner_enabled {
194 let shutdown = Arc::clone(&self.shutdown);
195 let wakeup_ms = self.cleaner_wakeup_ms;
196 let cleaner = Arc::clone(&cleaner);
197 let wake = self.cleaner_wake.clone();
198
199 self.cleaner_handle = Some(thread::spawn(move || {
200 log::info!("Cleaner daemon started");
201 while !shutdown.load(Ordering::Relaxed) {
202 let notified =
204 wake.wait_timeout(Duration::from_millis(wakeup_ms));
205 if notified || shutdown.load(Ordering::Relaxed) {
206 break;
207 }
208
209 match cleaner.do_clean(1, false) {
211 Ok(result) => {
212 if result.files_cleaned > 0 {
213 log::debug!(
214 "Cleaner: cleaned {} files, deleted {} files",
215 result.files_cleaned,
216 result.files_deleted
217 );
218 }
219 }
220 Err(e) => {
221 log::warn!("Cleaner error: {}", e);
222 }
223 }
224 }
225 log::info!("Cleaner daemon stopped");
226 }));
227 }
228
229 if self.checkpointer_enabled {
231 let shutdown = Arc::clone(&self.shutdown);
232 let wakeup_ms = self.checkpointer_wakeup_ms;
233 let checkpointer = Arc::clone(&checkpointer);
234 let wake = self.checkpointer_wake.clone();
235
236 self.checkpointer_handle = Some(thread::spawn(move || {
237 log::info!("Checkpointer daemon started");
238 while !shutdown.load(Ordering::Relaxed) {
239 let notified =
241 wake.wait_timeout(Duration::from_millis(wakeup_ms));
242 if notified || shutdown.load(Ordering::Relaxed) {
243 break;
244 }
245
246 if !checkpointer.is_runnable(false) {
250 continue;
251 }
252 match checkpointer.do_checkpoint("daemon") {
254 Ok(result) => {
255 log::debug!(
256 "Checkpoint: id={}, flushed {} nodes",
257 result.checkpoint_id,
258 result.total_nodes_flushed()
259 );
260 }
261 Err(e) => {
262 log::warn!("Checkpoint error: {}", e);
263 }
264 }
265 }
266 log::info!("Checkpointer daemon stopped");
267 }));
268 }
269 }
270
271 pub fn shutdown(&mut self) {
287 self.shutdown.store(true, Ordering::Relaxed);
290 self.cleaner_wake.notify();
291 self.checkpointer_wake.notify();
292 self.evictor_wake.notify();
293
294 if let Some(handle) = self.cleaner_handle.take()
296 && let Err(e) = handle.join()
297 {
298 log::error!("Failed to join cleaner thread: {:?}", e);
299 }
300
301 if let Some(handle) = self.checkpointer_handle.take()
303 && let Err(e) = handle.join()
304 {
305 log::error!("Failed to join checkpointer thread: {:?}", e);
306 }
307
308 if let Some(handle) = self.evictor_handle.take()
311 && let Err(e) = handle.join()
312 {
313 log::error!("Failed to join evictor thread: {:?}", e);
314 }
315 }
316
317 pub fn is_running(&self) -> bool {
330 !self.shutdown.load(Ordering::Relaxed)
334 }
335
336 pub fn running_count(&self) -> usize {
338 let mut count = 0;
339 if self.evictor_enabled && self.evictor_handle.is_some() {
340 count += 1;
341 }
342 if self.cleaner_enabled && self.cleaner_handle.is_some() {
343 count += 1;
344 }
345 if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
346 count += 1;
347 }
348 count
349 }
350}
351
352impl Drop for DaemonManager {
353 fn drop(&mut self) {
354 if self.is_running() {
356 self.shutdown();
357 }
358 }
359}
360
361#[cfg(noxu_shuttle)]
366pub mod dst_hooks {
367 pub use super::WakeHandle;
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373 use noxu_evictor::Arbiter;
374 use noxu_recovery::CheckpointConfig;
375 use std::sync::atomic::AtomicI64;
376
377 #[test]
378 fn test_daemon_manager_creation() {
379 let config = EngineConfig::default();
380 let manager = DaemonManager::new(&config);
381
382 assert!(manager.evictor_enabled);
383 assert!(manager.cleaner_enabled);
384 assert!(manager.checkpointer_enabled);
385 assert!(manager.is_running());
386 assert_eq!(manager.running_count(), 0); }
388
389 #[test]
390 fn test_daemon_manager_with_disabled_daemons() {
391 let config = EngineConfig::default()
392 .evictor_enabled(false)
393 .cleaner_enabled(false)
394 .checkpointer_enabled(false);
395 let manager = DaemonManager::new(&config);
396
397 assert!(!manager.evictor_enabled);
398 assert!(!manager.cleaner_enabled);
399 assert!(!manager.checkpointer_enabled);
400 }
401
402 #[test]
403 fn test_daemon_manager_start_and_shutdown() {
404 let config = EngineConfig::default()
405 .evictor_wakeup_interval_ms(100)
406 .cleaner_wakeup_interval_ms(100)
407 .checkpointer_wakeup_interval_ms(100);
408
409 let mut manager = DaemonManager::new(&config);
410
411 let usage = Arc::new(AtomicI64::new(500));
413 let arbiter = Arbiter::new(1000, usage, 100, 200);
414 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
415 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
416 let checkpointer =
417 Arc::new(Checkpointer::new(CheckpointConfig::default()));
418
419 manager.start_daemons(evictor, cleaner, checkpointer);
421
422 thread::sleep(Duration::from_millis(50));
424 assert!(manager.is_running());
425 assert_eq!(manager.running_count(), 3);
426
427 manager.shutdown();
429 assert!(!manager.is_running());
430 }
431
432 #[test]
433 fn test_daemon_manager_selective_daemons() {
434 let config = EngineConfig::default()
435 .evictor_enabled(true)
436 .cleaner_enabled(false)
437 .checkpointer_enabled(true)
438 .evictor_wakeup_interval_ms(100)
439 .checkpointer_wakeup_interval_ms(100);
440
441 let mut manager = DaemonManager::new(&config);
442
443 let usage = Arc::new(AtomicI64::new(500));
444 let arbiter = Arbiter::new(1000, usage, 100, 200);
445 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
446 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
447 let checkpointer =
448 Arc::new(Checkpointer::new(CheckpointConfig::default()));
449
450 manager.start_daemons(evictor, cleaner, checkpointer);
451
452 thread::sleep(Duration::from_millis(50));
453 assert_eq!(manager.running_count(), 2); manager.shutdown();
456 }
457
458 #[test]
459 fn test_daemon_manager_drop_cleanup() {
460 let config = EngineConfig::default()
461 .evictor_wakeup_interval_ms(100)
462 .cleaner_wakeup_interval_ms(100)
463 .checkpointer_wakeup_interval_ms(100);
464
465 let mut manager = DaemonManager::new(&config);
466
467 let usage = Arc::new(AtomicI64::new(500));
468 let arbiter = Arbiter::new(1000, usage, 100, 200);
469 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
470 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
471 let checkpointer =
472 Arc::new(Checkpointer::new(CheckpointConfig::default()));
473
474 manager.start_daemons(evictor, cleaner, checkpointer);
475
476 thread::sleep(Duration::from_millis(50));
477 assert!(manager.is_running());
478
479 drop(manager);
481 }
482
483 #[test]
484 fn test_daemon_wakeup_intervals() {
485 let config = EngineConfig::default()
486 .evictor_wakeup_interval_ms(1000)
487 .cleaner_wakeup_interval_ms(2000)
488 .checkpointer_wakeup_interval_ms(3000);
489
490 let manager = DaemonManager::new(&config);
491 assert_eq!(manager.evictor_wakeup_ms, 1000);
492 assert_eq!(manager.cleaner_wakeup_ms, 2000);
493 assert_eq!(manager.checkpointer_wakeup_ms, 3000);
494 }
495
496 #[test]
500 fn test_shutdown_wakes_daemons_early() {
501 use std::time::Instant;
502
503 let config = EngineConfig::default()
505 .evictor_wakeup_interval_ms(5000)
506 .cleaner_wakeup_interval_ms(5000)
507 .checkpointer_wakeup_interval_ms(5000);
508
509 let mut manager = DaemonManager::new(&config);
510
511 let usage = Arc::new(AtomicI64::new(500));
512 let arbiter = Arbiter::new(1000, usage, 100, 200);
513 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
514 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
515 let checkpointer =
516 Arc::new(Checkpointer::new(CheckpointConfig::default()));
517
518 manager.start_daemons(evictor, cleaner, checkpointer);
519
520 thread::sleep(Duration::from_millis(50));
522
523 let start = Instant::now();
524 manager.shutdown();
525 let elapsed = start.elapsed();
526
527 assert!(
529 elapsed < Duration::from_secs(1),
530 "shutdown took {:?}, expected < 1s",
531 elapsed
532 );
533 }
534
535 #[test]
536 fn test_wake_handle_timeout() {
537 let handle = WakeHandle::new();
538
539 let notified = handle.wait_timeout(Duration::from_millis(50));
541 assert!(!notified);
542 }
543
544 #[test]
545 fn test_wake_handle_notify() {
546 use std::time::Instant;
547
548 let handle = WakeHandle::new();
549 let handle2 = handle.clone();
550
551 let t = thread::spawn(move || {
553 thread::sleep(Duration::from_millis(20));
554 handle2.notify();
555 });
556
557 let start = Instant::now();
558 let notified = handle.wait_timeout(Duration::from_secs(5));
560 let elapsed = start.elapsed();
561
562 t.join().unwrap();
563
564 assert!(notified, "expected notify to return true");
565 assert!(
566 elapsed < Duration::from_millis(500),
567 "took {:?}, expected wakeup within 500ms",
568 elapsed
569 );
570 }
571
572 #[test]
591 fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
592 use std::sync::Mutex;
593 use std::time::Instant;
594
595 let join_seq: Arc<Mutex<Vec<&'static str>>> =
599 Arc::new(Mutex::new(Vec::new()));
600
601 let shutdown_flag = Arc::new(AtomicBool::new(false));
602
603 let cleaner_joined =
605 Arc::new((Mutex::new(false), std::sync::Condvar::new()));
606 let checkpointer_joined =
607 Arc::new((Mutex::new(false), std::sync::Condvar::new()));
608
609 let wake_c = WakeHandle::new();
610 let wake_cp = WakeHandle::new();
611 let wake_ev = WakeHandle::new();
612
613 let sd_c = shutdown_flag.clone();
615 let wake_c2 = wake_c.clone();
616 let cleaner_t = thread::spawn(move || {
617 while !sd_c.load(Ordering::Relaxed) {
618 wake_c2.wait_timeout(Duration::from_millis(5000));
619 }
620 });
622
623 let sd_cp = shutdown_flag.clone();
625 let wake_cp2 = wake_cp.clone();
626 let cj = cleaner_joined.clone();
627 let checkpointer_t = thread::spawn(move || {
628 while !sd_cp.load(Ordering::Relaxed) {
629 wake_cp2.wait_timeout(Duration::from_millis(5000));
630 }
631 let (lock, cv) = &*cj;
633 let mut g = lock.lock().unwrap();
634 while !*g {
635 g = cv.wait(g).unwrap();
636 }
637 });
638
639 let sd_ev = shutdown_flag.clone();
641 let wake_ev2 = wake_ev.clone();
642 let cpj = checkpointer_joined.clone();
643 let evictor_t = thread::spawn(move || {
644 while !sd_ev.load(Ordering::Relaxed) {
645 wake_ev2.wait_timeout(Duration::from_millis(5000));
646 }
647 let (lock, cv) = &*cpj;
648 let mut g = lock.lock().unwrap();
649 while !*g {
650 g = cv.wait(g).unwrap();
651 }
652 });
653
654 shutdown_flag.store(true, Ordering::Relaxed);
656 wake_c.notify();
657 wake_cp.notify();
658 wake_ev.notify();
659
660 let start = Instant::now();
661
662 cleaner_t.join().unwrap();
664 join_seq.lock().unwrap().push("cleaner");
665 {
666 let (l, cv) = &*cleaner_joined;
667 *l.lock().unwrap() = true;
668 cv.notify_all();
669 }
670
671 checkpointer_t.join().unwrap();
673 join_seq.lock().unwrap().push("checkpointer");
674 {
675 let (l, cv) = &*checkpointer_joined;
676 *l.lock().unwrap() = true;
677 cv.notify_all();
678 }
679
680 evictor_t.join().unwrap();
682 join_seq.lock().unwrap().push("evictor");
683
684 let elapsed = start.elapsed();
685 assert!(
686 elapsed < Duration::from_secs(2),
687 "CC-3: shutdown stalled: {:?}",
688 elapsed
689 );
690
691 let order = join_seq.lock().unwrap();
692 assert_eq!(
693 *order,
694 vec!["cleaner", "checkpointer", "evictor"],
695 "CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
696 );
697 }
698
699 #[test]
703 fn test_cc3_shutdown_no_deadlock_bounded_time() {
704 use std::time::Instant;
705
706 let config = EngineConfig::default()
708 .evictor_wakeup_interval_ms(10_000)
709 .cleaner_wakeup_interval_ms(10_000)
710 .checkpointer_wakeup_interval_ms(10_000);
711
712 let mut manager = DaemonManager::new(&config);
713
714 let usage = Arc::new(AtomicI64::new(500));
715 let arbiter = Arbiter::new(1000, usage, 100, 200);
716 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
717 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
718 let checkpointer =
719 Arc::new(Checkpointer::new(CheckpointConfig::default()));
720
721 manager.start_daemons(evictor, cleaner, checkpointer);
722 thread::sleep(Duration::from_millis(30));
723
724 let start = Instant::now();
725 manager.shutdown();
726 let elapsed = start.elapsed();
727
728 assert!(
729 elapsed < Duration::from_secs(2),
730 "CC-3: shutdown deadlocked or stalled: took {:?}",
731 elapsed
732 );
733 assert!(!manager.is_running());
734 }
735}