1use crate::engine_config::EngineConfig;
18use noxu_cleaner::Cleaner;
19use noxu_evictor::{EvictionSource, Evictor};
20use noxu_recovery::Checkpointer;
21use noxu_util::dst_sync::atomic::{AtomicBool, Ordering};
22use noxu_util::dst_sync::{Arc, Condvar, Mutex, thread};
23use std::time::Duration;
24
25#[derive(Clone)]
35#[doc(hidden)]
36pub struct WakeHandle {
37 pair: Arc<(Mutex<bool>, Condvar)>,
38}
39
40impl WakeHandle {
41 pub(crate) fn new() -> Self {
42 Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
43 }
44
45 #[doc(hidden)]
60 pub fn wait_timeout(&self, duration: Duration) -> bool {
61 let (lock, cvar) = &*self.pair;
62 let guard = lock.lock().unwrap();
63 if *guard {
64 return true;
65 }
66 let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
67 *guard
68 }
69
70 #[doc(hidden)]
72 pub fn notify(&self) {
73 let (lock, cvar) = &*self.pair;
74 *lock.lock().unwrap() = true;
75 cvar.notify_all();
76 }
77}
78
79#[cfg(noxu_shuttle)]
80impl WakeHandle {
81 #[doc(hidden)]
83 pub fn new_for_shuttle() -> Self {
84 Self::new()
85 }
86}
87
88pub struct DaemonManager {
99 shutdown: Arc<AtomicBool>,
101
102 evictor_wake: WakeHandle,
104 cleaner_wake: WakeHandle,
105 checkpointer_wake: WakeHandle,
106
107 evictor_handle: Option<thread::JoinHandle<()>>,
109
110 cleaner_handle: Option<thread::JoinHandle<()>>,
112
113 checkpointer_handle: Option<thread::JoinHandle<()>>,
115
116 evictor_enabled: bool,
118
119 cleaner_enabled: bool,
121
122 checkpointer_enabled: bool,
124
125 evictor_wakeup_ms: u64,
127
128 cleaner_wakeup_ms: u64,
130
131 checkpointer_wakeup_ms: u64,
133}
134
135impl DaemonManager {
136 pub fn new(config: &EngineConfig) -> Self {
140 Self {
141 shutdown: Arc::new(AtomicBool::new(false)),
142 evictor_wake: WakeHandle::new(),
143 cleaner_wake: WakeHandle::new(),
144 checkpointer_wake: WakeHandle::new(),
145 evictor_handle: None,
146 cleaner_handle: None,
147 checkpointer_handle: None,
148 evictor_enabled: config.evictor_enabled,
149 cleaner_enabled: config.cleaner_enabled,
150 checkpointer_enabled: config.checkpointer_enabled,
151 evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
152 cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
153 checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
154 }
155 }
156
157 pub fn start_daemons(
170 &mut self,
171 evictor: Arc<Evictor>,
172 cleaner: Arc<Cleaner>,
173 checkpointer: Arc<Checkpointer>,
174 ) {
175 if self.evictor_enabled {
177 let shutdown = Arc::clone(&self.shutdown);
178 let wakeup_ms = self.evictor_wakeup_ms;
179 let evictor = Arc::clone(&evictor);
180 let wake = self.evictor_wake.clone();
181
182 self.evictor_handle = Some(thread::spawn(move || {
183 log::info!("Evictor daemon started");
184 while !shutdown.load(Ordering::Relaxed) {
185 let notified =
187 wake.wait_timeout(Duration::from_millis(wakeup_ms));
188 if notified || shutdown.load(Ordering::Relaxed) {
189 break;
190 }
191
192 let result = evictor.do_evict(EvictionSource::Daemon);
194 if result.nodes_evicted > 0 {
195 log::debug!(
196 "Evictor: evicted {} nodes, {} bytes",
197 result.nodes_evicted,
198 result.bytes_evicted
199 );
200 }
201 }
202 log::info!("Evictor daemon stopped");
203 }));
204 }
205
206 if self.cleaner_enabled {
208 let shutdown = Arc::clone(&self.shutdown);
209 let wakeup_ms = self.cleaner_wakeup_ms;
210 let cleaner = Arc::clone(&cleaner);
211 let wake = self.cleaner_wake.clone();
212
213 self.cleaner_handle = Some(thread::spawn(move || {
214 log::info!("Cleaner daemon started");
215 while !shutdown.load(Ordering::Relaxed) {
216 let notified =
218 wake.wait_timeout(Duration::from_millis(wakeup_ms));
219 if notified || shutdown.load(Ordering::Relaxed) {
220 break;
221 }
222
223 match cleaner.do_clean(1, false) {
225 Ok(result) => {
226 if result.files_cleaned > 0 {
227 log::debug!(
228 "Cleaner: cleaned {} files, deleted {} files",
229 result.files_cleaned,
230 result.files_deleted
231 );
232 }
233 }
234 Err(e) => {
235 log::warn!("Cleaner error: {}", e);
236 }
237 }
238 }
239 log::info!("Cleaner daemon stopped");
240 }));
241 }
242
243 if self.checkpointer_enabled {
245 let shutdown = Arc::clone(&self.shutdown);
246 let wakeup_ms = self.checkpointer_wakeup_ms;
247 let checkpointer = Arc::clone(&checkpointer);
248 let wake = self.checkpointer_wake.clone();
249
250 self.checkpointer_handle = Some(thread::spawn(move || {
251 log::info!("Checkpointer daemon started");
252 while !shutdown.load(Ordering::Relaxed) {
253 let notified =
255 wake.wait_timeout(Duration::from_millis(wakeup_ms));
256 if notified || shutdown.load(Ordering::Relaxed) {
257 break;
258 }
259
260 if !checkpointer.is_runnable(false) {
264 continue;
265 }
266 match checkpointer.do_checkpoint("daemon") {
268 Ok(result) => {
269 log::debug!(
270 "Checkpoint: id={}, flushed {} nodes",
271 result.checkpoint_id,
272 result.total_nodes_flushed()
273 );
274 }
275 Err(e) => {
276 log::warn!("Checkpoint error: {}", e);
277 }
278 }
279 }
280 log::info!("Checkpointer daemon stopped");
281 }));
282 }
283 }
284
285 pub fn shutdown(&mut self) {
301 self.shutdown.store(true, Ordering::Relaxed);
304 self.cleaner_wake.notify();
305 self.checkpointer_wake.notify();
306 self.evictor_wake.notify();
307
308 if let Some(handle) = self.cleaner_handle.take()
310 && let Err(e) = handle.join()
311 {
312 log::error!("Failed to join cleaner thread: {:?}", e);
313 }
314
315 if let Some(handle) = self.checkpointer_handle.take()
317 && let Err(e) = handle.join()
318 {
319 log::error!("Failed to join checkpointer thread: {:?}", e);
320 }
321
322 if let Some(handle) = self.evictor_handle.take()
325 && let Err(e) = handle.join()
326 {
327 log::error!("Failed to join evictor thread: {:?}", e);
328 }
329 }
330
331 pub fn is_running(&self) -> bool {
344 !self.shutdown.load(Ordering::Relaxed)
348 }
349
350 pub fn running_count(&self) -> usize {
352 let mut count = 0;
353 if self.evictor_enabled && self.evictor_handle.is_some() {
354 count += 1;
355 }
356 if self.cleaner_enabled && self.cleaner_handle.is_some() {
357 count += 1;
358 }
359 if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
360 count += 1;
361 }
362 count
363 }
364}
365
366impl Drop for DaemonManager {
367 fn drop(&mut self) {
368 if self.is_running() {
370 self.shutdown();
371 }
372 }
373}
374
375#[cfg(noxu_shuttle)]
380pub mod dst_hooks {
381 pub use super::WakeHandle;
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use noxu_evictor::Arbiter;
388 use noxu_recovery::CheckpointConfig;
389 use std::sync::atomic::AtomicI64;
390
391 #[test]
392 fn test_daemon_manager_creation() {
393 let config = EngineConfig::default();
394 let manager = DaemonManager::new(&config);
395
396 assert!(manager.evictor_enabled);
397 assert!(manager.cleaner_enabled);
398 assert!(manager.checkpointer_enabled);
399 assert!(manager.is_running());
400 assert_eq!(manager.running_count(), 0); }
402
403 #[test]
404 fn test_daemon_manager_with_disabled_daemons() {
405 let config = EngineConfig::default()
406 .evictor_enabled(false)
407 .cleaner_enabled(false)
408 .checkpointer_enabled(false);
409 let manager = DaemonManager::new(&config);
410
411 assert!(!manager.evictor_enabled);
412 assert!(!manager.cleaner_enabled);
413 assert!(!manager.checkpointer_enabled);
414 }
415
416 #[test]
417 fn test_daemon_manager_start_and_shutdown() {
418 let config = EngineConfig::default()
419 .evictor_wakeup_interval_ms(100)
420 .cleaner_wakeup_interval_ms(100)
421 .checkpointer_wakeup_interval_ms(100);
422
423 let mut manager = DaemonManager::new(&config);
424
425 let usage = Arc::new(AtomicI64::new(500));
427 let arbiter = Arbiter::new(1000, usage, 100, 200);
428 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
429 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
430 let checkpointer =
431 Arc::new(Checkpointer::new(CheckpointConfig::default()));
432
433 manager.start_daemons(evictor, cleaner, checkpointer);
435
436 thread::sleep(Duration::from_millis(50));
438 assert!(manager.is_running());
439 assert_eq!(manager.running_count(), 3);
440
441 manager.shutdown();
443 assert!(!manager.is_running());
444 }
445
446 #[test]
447 fn test_daemon_manager_selective_daemons() {
448 let config = EngineConfig::default()
449 .evictor_enabled(true)
450 .cleaner_enabled(false)
451 .checkpointer_enabled(true)
452 .evictor_wakeup_interval_ms(100)
453 .checkpointer_wakeup_interval_ms(100);
454
455 let mut manager = DaemonManager::new(&config);
456
457 let usage = Arc::new(AtomicI64::new(500));
458 let arbiter = Arbiter::new(1000, usage, 100, 200);
459 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
460 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
461 let checkpointer =
462 Arc::new(Checkpointer::new(CheckpointConfig::default()));
463
464 manager.start_daemons(evictor, cleaner, checkpointer);
465
466 thread::sleep(Duration::from_millis(50));
467 assert_eq!(manager.running_count(), 2); manager.shutdown();
470 }
471
472 #[test]
473 fn test_daemon_manager_drop_cleanup() {
474 let config = EngineConfig::default()
475 .evictor_wakeup_interval_ms(100)
476 .cleaner_wakeup_interval_ms(100)
477 .checkpointer_wakeup_interval_ms(100);
478
479 let mut manager = DaemonManager::new(&config);
480
481 let usage = Arc::new(AtomicI64::new(500));
482 let arbiter = Arbiter::new(1000, usage, 100, 200);
483 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
484 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
485 let checkpointer =
486 Arc::new(Checkpointer::new(CheckpointConfig::default()));
487
488 manager.start_daemons(evictor, cleaner, checkpointer);
489
490 thread::sleep(Duration::from_millis(50));
491 assert!(manager.is_running());
492
493 drop(manager);
495 }
496
497 #[test]
498 fn test_daemon_wakeup_intervals() {
499 let config = EngineConfig::default()
500 .evictor_wakeup_interval_ms(1000)
501 .cleaner_wakeup_interval_ms(2000)
502 .checkpointer_wakeup_interval_ms(3000);
503
504 let manager = DaemonManager::new(&config);
505 assert_eq!(manager.evictor_wakeup_ms, 1000);
506 assert_eq!(manager.cleaner_wakeup_ms, 2000);
507 assert_eq!(manager.checkpointer_wakeup_ms, 3000);
508 }
509
510 #[test]
514 fn test_shutdown_wakes_daemons_early() {
515 use std::time::Instant;
516
517 let config = EngineConfig::default()
519 .evictor_wakeup_interval_ms(5000)
520 .cleaner_wakeup_interval_ms(5000)
521 .checkpointer_wakeup_interval_ms(5000);
522
523 let mut manager = DaemonManager::new(&config);
524
525 let usage = Arc::new(AtomicI64::new(500));
526 let arbiter = Arbiter::new(1000, usage, 100, 200);
527 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
528 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
529 let checkpointer =
530 Arc::new(Checkpointer::new(CheckpointConfig::default()));
531
532 manager.start_daemons(evictor, cleaner, checkpointer);
533
534 thread::sleep(Duration::from_millis(50));
536
537 let start = Instant::now();
538 manager.shutdown();
539 let elapsed = start.elapsed();
540
541 assert!(
543 elapsed < Duration::from_secs(1),
544 "shutdown took {:?}, expected < 1s",
545 elapsed
546 );
547 }
548
549 #[test]
550 fn test_wake_handle_timeout() {
551 let handle = WakeHandle::new();
552
553 let notified = handle.wait_timeout(Duration::from_millis(50));
555 assert!(!notified);
556 }
557
558 #[test]
559 fn test_wake_handle_notify() {
560 use std::time::Instant;
561
562 let handle = WakeHandle::new();
563 let handle2 = handle.clone();
564
565 let t = thread::spawn(move || {
567 thread::sleep(Duration::from_millis(20));
568 handle2.notify();
569 });
570
571 let start = Instant::now();
572 let notified = handle.wait_timeout(Duration::from_secs(5));
574 let elapsed = start.elapsed();
575
576 t.join().unwrap();
577
578 assert!(notified, "expected notify to return true");
579 assert!(
580 elapsed < Duration::from_millis(500),
581 "took {:?}, expected wakeup within 500ms",
582 elapsed
583 );
584 }
585
586 #[test]
605 fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
606 use std::sync::Mutex;
607 use std::time::Instant;
608
609 let join_seq: Arc<Mutex<Vec<&'static str>>> =
613 Arc::new(Mutex::new(Vec::new()));
614
615 let shutdown_flag = Arc::new(AtomicBool::new(false));
616
617 let cleaner_joined =
619 Arc::new((Mutex::new(false), std::sync::Condvar::new()));
620 let checkpointer_joined =
621 Arc::new((Mutex::new(false), std::sync::Condvar::new()));
622
623 let wake_c = WakeHandle::new();
624 let wake_cp = WakeHandle::new();
625 let wake_ev = WakeHandle::new();
626
627 let sd_c = shutdown_flag.clone();
629 let wake_c2 = wake_c.clone();
630 let cleaner_t = thread::spawn(move || {
631 while !sd_c.load(Ordering::Relaxed) {
632 wake_c2.wait_timeout(Duration::from_millis(5000));
633 }
634 });
636
637 let sd_cp = shutdown_flag.clone();
639 let wake_cp2 = wake_cp.clone();
640 let cj = cleaner_joined.clone();
641 let checkpointer_t = thread::spawn(move || {
642 while !sd_cp.load(Ordering::Relaxed) {
643 wake_cp2.wait_timeout(Duration::from_millis(5000));
644 }
645 let (lock, cv) = &*cj;
647 let mut g = lock.lock().unwrap();
648 while !*g {
649 g = cv.wait(g).unwrap();
650 }
651 });
652
653 let sd_ev = shutdown_flag.clone();
655 let wake_ev2 = wake_ev.clone();
656 let cpj = checkpointer_joined.clone();
657 let evictor_t = thread::spawn(move || {
658 while !sd_ev.load(Ordering::Relaxed) {
659 wake_ev2.wait_timeout(Duration::from_millis(5000));
660 }
661 let (lock, cv) = &*cpj;
662 let mut g = lock.lock().unwrap();
663 while !*g {
664 g = cv.wait(g).unwrap();
665 }
666 });
667
668 shutdown_flag.store(true, Ordering::Relaxed);
670 wake_c.notify();
671 wake_cp.notify();
672 wake_ev.notify();
673
674 let start = Instant::now();
675
676 cleaner_t.join().unwrap();
678 join_seq.lock().unwrap().push("cleaner");
679 {
680 let (l, cv) = &*cleaner_joined;
681 *l.lock().unwrap() = true;
682 cv.notify_all();
683 }
684
685 checkpointer_t.join().unwrap();
687 join_seq.lock().unwrap().push("checkpointer");
688 {
689 let (l, cv) = &*checkpointer_joined;
690 *l.lock().unwrap() = true;
691 cv.notify_all();
692 }
693
694 evictor_t.join().unwrap();
696 join_seq.lock().unwrap().push("evictor");
697
698 let elapsed = start.elapsed();
699 assert!(
700 elapsed < Duration::from_secs(2),
701 "CC-3: shutdown stalled: {:?}",
702 elapsed
703 );
704
705 let order = join_seq.lock().unwrap();
706 assert_eq!(
707 *order,
708 vec!["cleaner", "checkpointer", "evictor"],
709 "CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
710 );
711 }
712
713 #[test]
717 fn test_cc3_shutdown_no_deadlock_bounded_time() {
718 use std::time::Instant;
719
720 let config = EngineConfig::default()
722 .evictor_wakeup_interval_ms(10_000)
723 .cleaner_wakeup_interval_ms(10_000)
724 .checkpointer_wakeup_interval_ms(10_000);
725
726 let mut manager = DaemonManager::new(&config);
727
728 let usage = Arc::new(AtomicI64::new(500));
729 let arbiter = Arbiter::new(1000, usage, 100, 200);
730 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
731 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
732 let checkpointer =
733 Arc::new(Checkpointer::new(CheckpointConfig::default()));
734
735 manager.start_daemons(evictor, cleaner, checkpointer);
736 thread::sleep(Duration::from_millis(30));
737
738 let start = Instant::now();
739 manager.shutdown();
740 let elapsed = start.elapsed();
741
742 assert!(
743 elapsed < Duration::from_secs(2),
744 "CC-3: shutdown deadlocked or stalled: took {:?}",
745 elapsed
746 );
747 assert!(!manager.is_running());
748 }
749}