1use crate::engine_config::EngineConfig;
4use noxu_cleaner::Cleaner;
5use noxu_evictor::{EvictionSource, Evictor};
6use noxu_recovery::Checkpointer;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12#[derive(Clone)]
18struct WakeHandle {
19 pair: Arc<(Mutex<bool>, Condvar)>,
20}
21
22impl WakeHandle {
23 fn new() -> Self {
24 Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
25 }
26
27 fn wait_timeout(&self, duration: Duration) -> bool {
32 let (lock, cvar) = &*self.pair;
33 let guard = lock.lock().unwrap();
34 let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
35 *guard
36 }
37
38 fn notify(&self) {
40 let (lock, cvar) = &*self.pair;
41 *lock.lock().unwrap() = true;
42 cvar.notify_all();
43 }
44}
45
46pub struct DaemonManager {
57 shutdown: Arc<AtomicBool>,
59
60 evictor_wake: WakeHandle,
62 cleaner_wake: WakeHandle,
63 checkpointer_wake: WakeHandle,
64
65 evictor_handle: Option<JoinHandle<()>>,
67
68 cleaner_handle: Option<JoinHandle<()>>,
70
71 checkpointer_handle: Option<JoinHandle<()>>,
73
74 evictor_enabled: bool,
76
77 cleaner_enabled: bool,
79
80 checkpointer_enabled: bool,
82
83 evictor_wakeup_ms: u64,
85
86 cleaner_wakeup_ms: u64,
88
89 checkpointer_wakeup_ms: u64,
91}
92
93impl DaemonManager {
94 pub fn new(config: &EngineConfig) -> Self {
98 Self {
99 shutdown: Arc::new(AtomicBool::new(false)),
100 evictor_wake: WakeHandle::new(),
101 cleaner_wake: WakeHandle::new(),
102 checkpointer_wake: WakeHandle::new(),
103 evictor_handle: None,
104 cleaner_handle: None,
105 checkpointer_handle: None,
106 evictor_enabled: config.evictor_enabled,
107 cleaner_enabled: config.cleaner_enabled,
108 checkpointer_enabled: config.checkpointer_enabled,
109 evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
110 cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
111 checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
112 }
113 }
114
115 pub fn start_daemons(
128 &mut self,
129 evictor: Arc<Evictor>,
130 cleaner: Arc<Cleaner>,
131 checkpointer: Arc<Checkpointer>,
132 ) {
133 if self.evictor_enabled {
135 let shutdown = Arc::clone(&self.shutdown);
136 let wakeup_ms = self.evictor_wakeup_ms;
137 let evictor = Arc::clone(&evictor);
138 let wake = self.evictor_wake.clone();
139
140 self.evictor_handle = Some(thread::spawn(move || {
141 log::info!("Evictor daemon started");
142 while !shutdown.load(Ordering::Relaxed) {
143 let notified =
145 wake.wait_timeout(Duration::from_millis(wakeup_ms));
146 if notified || shutdown.load(Ordering::Relaxed) {
147 break;
148 }
149
150 let result = evictor.do_evict(EvictionSource::Daemon);
152 if result.nodes_evicted > 0 {
153 log::debug!(
154 "Evictor: evicted {} nodes, {} bytes",
155 result.nodes_evicted,
156 result.bytes_evicted
157 );
158 }
159 }
160 log::info!("Evictor daemon stopped");
161 }));
162 }
163
164 if self.cleaner_enabled {
166 let shutdown = Arc::clone(&self.shutdown);
167 let wakeup_ms = self.cleaner_wakeup_ms;
168 let cleaner = Arc::clone(&cleaner);
169 let wake = self.cleaner_wake.clone();
170
171 self.cleaner_handle = Some(thread::spawn(move || {
172 log::info!("Cleaner daemon started");
173 while !shutdown.load(Ordering::Relaxed) {
174 let notified =
176 wake.wait_timeout(Duration::from_millis(wakeup_ms));
177 if notified || shutdown.load(Ordering::Relaxed) {
178 break;
179 }
180
181 match cleaner.do_clean(1, false) {
183 Ok(result) => {
184 if result.files_cleaned > 0 {
185 log::debug!(
186 "Cleaner: cleaned {} files, deleted {} files",
187 result.files_cleaned,
188 result.files_deleted
189 );
190 }
191 }
192 Err(e) => {
193 log::warn!("Cleaner error: {}", e);
194 }
195 }
196 }
197 log::info!("Cleaner daemon stopped");
198 }));
199 }
200
201 if self.checkpointer_enabled {
203 let shutdown = Arc::clone(&self.shutdown);
204 let wakeup_ms = self.checkpointer_wakeup_ms;
205 let checkpointer = Arc::clone(&checkpointer);
206 let wake = self.checkpointer_wake.clone();
207
208 self.checkpointer_handle = Some(thread::spawn(move || {
209 log::info!("Checkpointer daemon started");
210 while !shutdown.load(Ordering::Relaxed) {
211 let notified =
213 wake.wait_timeout(Duration::from_millis(wakeup_ms));
214 if notified || shutdown.load(Ordering::Relaxed) {
215 break;
216 }
217
218 if !checkpointer.is_runnable(false) {
222 continue;
223 }
224 match checkpointer.do_checkpoint("daemon") {
226 Ok(result) => {
227 log::debug!(
228 "Checkpoint: id={}, flushed {} nodes",
229 result.checkpoint_id,
230 result.total_nodes_flushed()
231 );
232 }
233 Err(e) => {
234 log::warn!("Checkpoint error: {}", e);
235 }
236 }
237 }
238 log::info!("Checkpointer daemon stopped");
239 }));
240 }
241 }
242
243 pub fn shutdown(&mut self) {
259 self.shutdown.store(true, Ordering::Relaxed);
262 self.cleaner_wake.notify();
263 self.checkpointer_wake.notify();
264 self.evictor_wake.notify();
265
266 if let Some(handle) = self.cleaner_handle.take()
268 && let Err(e) = handle.join()
269 {
270 log::error!("Failed to join cleaner thread: {:?}", e);
271 }
272
273 if let Some(handle) = self.checkpointer_handle.take()
275 && let Err(e) = handle.join()
276 {
277 log::error!("Failed to join checkpointer thread: {:?}", e);
278 }
279
280 if let Some(handle) = self.evictor_handle.take()
283 && let Err(e) = handle.join()
284 {
285 log::error!("Failed to join evictor thread: {:?}", e);
286 }
287 }
288
289 pub fn is_running(&self) -> bool {
302 !self.shutdown.load(Ordering::Relaxed)
306 }
307
308 pub fn running_count(&self) -> usize {
310 let mut count = 0;
311 if self.evictor_enabled && self.evictor_handle.is_some() {
312 count += 1;
313 }
314 if self.cleaner_enabled && self.cleaner_handle.is_some() {
315 count += 1;
316 }
317 if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
318 count += 1;
319 }
320 count
321 }
322}
323
324impl Drop for DaemonManager {
325 fn drop(&mut self) {
326 if self.is_running() {
328 self.shutdown();
329 }
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use noxu_evictor::Arbiter;
337 use noxu_recovery::CheckpointConfig;
338 use std::sync::atomic::AtomicI64;
339
340 #[test]
341 fn test_daemon_manager_creation() {
342 let config = EngineConfig::default();
343 let manager = DaemonManager::new(&config);
344
345 assert!(manager.evictor_enabled);
346 assert!(manager.cleaner_enabled);
347 assert!(manager.checkpointer_enabled);
348 assert!(manager.is_running());
349 assert_eq!(manager.running_count(), 0); }
351
352 #[test]
353 fn test_daemon_manager_with_disabled_daemons() {
354 let config = EngineConfig::default()
355 .evictor_enabled(false)
356 .cleaner_enabled(false)
357 .checkpointer_enabled(false);
358 let manager = DaemonManager::new(&config);
359
360 assert!(!manager.evictor_enabled);
361 assert!(!manager.cleaner_enabled);
362 assert!(!manager.checkpointer_enabled);
363 }
364
365 #[test]
366 fn test_daemon_manager_start_and_shutdown() {
367 let config = EngineConfig::default()
368 .evictor_wakeup_interval_ms(100)
369 .cleaner_wakeup_interval_ms(100)
370 .checkpointer_wakeup_interval_ms(100);
371
372 let mut manager = DaemonManager::new(&config);
373
374 let usage = Arc::new(AtomicI64::new(500));
376 let arbiter = Arbiter::new(1000, usage, 100, 200);
377 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
378 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
379 let checkpointer =
380 Arc::new(Checkpointer::new(CheckpointConfig::default()));
381
382 manager.start_daemons(evictor, cleaner, checkpointer);
384
385 thread::sleep(Duration::from_millis(50));
387 assert!(manager.is_running());
388 assert_eq!(manager.running_count(), 3);
389
390 manager.shutdown();
392 assert!(!manager.is_running());
393 }
394
395 #[test]
396 fn test_daemon_manager_selective_daemons() {
397 let config = EngineConfig::default()
398 .evictor_enabled(true)
399 .cleaner_enabled(false)
400 .checkpointer_enabled(true)
401 .evictor_wakeup_interval_ms(100)
402 .checkpointer_wakeup_interval_ms(100);
403
404 let mut manager = DaemonManager::new(&config);
405
406 let usage = Arc::new(AtomicI64::new(500));
407 let arbiter = Arbiter::new(1000, usage, 100, 200);
408 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
409 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
410 let checkpointer =
411 Arc::new(Checkpointer::new(CheckpointConfig::default()));
412
413 manager.start_daemons(evictor, cleaner, checkpointer);
414
415 thread::sleep(Duration::from_millis(50));
416 assert_eq!(manager.running_count(), 2); manager.shutdown();
419 }
420
421 #[test]
422 fn test_daemon_manager_drop_cleanup() {
423 let config = EngineConfig::default()
424 .evictor_wakeup_interval_ms(100)
425 .cleaner_wakeup_interval_ms(100)
426 .checkpointer_wakeup_interval_ms(100);
427
428 let mut manager = DaemonManager::new(&config);
429
430 let usage = Arc::new(AtomicI64::new(500));
431 let arbiter = Arbiter::new(1000, usage, 100, 200);
432 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
433 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
434 let checkpointer =
435 Arc::new(Checkpointer::new(CheckpointConfig::default()));
436
437 manager.start_daemons(evictor, cleaner, checkpointer);
438
439 thread::sleep(Duration::from_millis(50));
440 assert!(manager.is_running());
441
442 drop(manager);
444 }
445
446 #[test]
447 fn test_daemon_wakeup_intervals() {
448 let config = EngineConfig::default()
449 .evictor_wakeup_interval_ms(1000)
450 .cleaner_wakeup_interval_ms(2000)
451 .checkpointer_wakeup_interval_ms(3000);
452
453 let manager = DaemonManager::new(&config);
454 assert_eq!(manager.evictor_wakeup_ms, 1000);
455 assert_eq!(manager.cleaner_wakeup_ms, 2000);
456 assert_eq!(manager.checkpointer_wakeup_ms, 3000);
457 }
458
459 #[test]
463 fn test_shutdown_wakes_daemons_early() {
464 use std::time::Instant;
465
466 let config = EngineConfig::default()
468 .evictor_wakeup_interval_ms(5000)
469 .cleaner_wakeup_interval_ms(5000)
470 .checkpointer_wakeup_interval_ms(5000);
471
472 let mut manager = DaemonManager::new(&config);
473
474 let usage = Arc::new(AtomicI64::new(500));
475 let arbiter = Arbiter::new(1000, usage, 100, 200);
476 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
477 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
478 let checkpointer =
479 Arc::new(Checkpointer::new(CheckpointConfig::default()));
480
481 manager.start_daemons(evictor, cleaner, checkpointer);
482
483 thread::sleep(Duration::from_millis(50));
485
486 let start = Instant::now();
487 manager.shutdown();
488 let elapsed = start.elapsed();
489
490 assert!(
492 elapsed < Duration::from_secs(1),
493 "shutdown took {:?}, expected < 1s",
494 elapsed
495 );
496 }
497
498 #[test]
499 fn test_wake_handle_timeout() {
500 let handle = WakeHandle::new();
501
502 let notified = handle.wait_timeout(Duration::from_millis(50));
504 assert!(!notified);
505 }
506
507 #[test]
508 fn test_wake_handle_notify() {
509 use std::time::Instant;
510
511 let handle = WakeHandle::new();
512 let handle2 = handle.clone();
513
514 let t = thread::spawn(move || {
516 thread::sleep(Duration::from_millis(20));
517 handle2.notify();
518 });
519
520 let start = Instant::now();
521 let notified = handle.wait_timeout(Duration::from_secs(5));
523 let elapsed = start.elapsed();
524
525 t.join().unwrap();
526
527 assert!(notified, "expected notify to return true");
528 assert!(
529 elapsed < Duration::from_millis(500),
530 "took {:?}, expected wakeup within 500ms",
531 elapsed
532 );
533 }
534
535 #[test]
554 fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
555 use std::sync::Mutex;
556 use std::time::Instant;
557
558 let join_seq: Arc<Mutex<Vec<&'static str>>> =
562 Arc::new(Mutex::new(Vec::new()));
563
564 let shutdown_flag = Arc::new(AtomicBool::new(false));
565
566 let cleaner_joined =
568 Arc::new((Mutex::new(false), std::sync::Condvar::new()));
569 let checkpointer_joined =
570 Arc::new((Mutex::new(false), std::sync::Condvar::new()));
571
572 let wake_c = WakeHandle::new();
573 let wake_cp = WakeHandle::new();
574 let wake_ev = WakeHandle::new();
575
576 let sd_c = shutdown_flag.clone();
578 let wake_c2 = wake_c.clone();
579 let cleaner_t = thread::spawn(move || {
580 while !sd_c.load(Ordering::Relaxed) {
581 wake_c2.wait_timeout(Duration::from_millis(5000));
582 }
583 });
585
586 let sd_cp = shutdown_flag.clone();
588 let wake_cp2 = wake_cp.clone();
589 let cj = cleaner_joined.clone();
590 let checkpointer_t = thread::spawn(move || {
591 while !sd_cp.load(Ordering::Relaxed) {
592 wake_cp2.wait_timeout(Duration::from_millis(5000));
593 }
594 let (lock, cv) = &*cj;
596 let mut g = lock.lock().unwrap();
597 while !*g {
598 g = cv.wait(g).unwrap();
599 }
600 });
601
602 let sd_ev = shutdown_flag.clone();
604 let wake_ev2 = wake_ev.clone();
605 let cpj = checkpointer_joined.clone();
606 let evictor_t = thread::spawn(move || {
607 while !sd_ev.load(Ordering::Relaxed) {
608 wake_ev2.wait_timeout(Duration::from_millis(5000));
609 }
610 let (lock, cv) = &*cpj;
611 let mut g = lock.lock().unwrap();
612 while !*g {
613 g = cv.wait(g).unwrap();
614 }
615 });
616
617 shutdown_flag.store(true, Ordering::Relaxed);
619 wake_c.notify();
620 wake_cp.notify();
621 wake_ev.notify();
622
623 let start = Instant::now();
624
625 cleaner_t.join().unwrap();
627 join_seq.lock().unwrap().push("cleaner");
628 {
629 let (l, cv) = &*cleaner_joined;
630 *l.lock().unwrap() = true;
631 cv.notify_all();
632 }
633
634 checkpointer_t.join().unwrap();
636 join_seq.lock().unwrap().push("checkpointer");
637 {
638 let (l, cv) = &*checkpointer_joined;
639 *l.lock().unwrap() = true;
640 cv.notify_all();
641 }
642
643 evictor_t.join().unwrap();
645 join_seq.lock().unwrap().push("evictor");
646
647 let elapsed = start.elapsed();
648 assert!(
649 elapsed < Duration::from_secs(2),
650 "CC-3: shutdown stalled: {:?}",
651 elapsed
652 );
653
654 let order = join_seq.lock().unwrap();
655 assert_eq!(
656 *order,
657 vec!["cleaner", "checkpointer", "evictor"],
658 "CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
659 );
660 }
661
662 #[test]
666 fn test_cc3_shutdown_no_deadlock_bounded_time() {
667 use std::time::Instant;
668
669 let config = EngineConfig::default()
671 .evictor_wakeup_interval_ms(10_000)
672 .cleaner_wakeup_interval_ms(10_000)
673 .checkpointer_wakeup_interval_ms(10_000);
674
675 let mut manager = DaemonManager::new(&config);
676
677 let usage = Arc::new(AtomicI64::new(500));
678 let arbiter = Arbiter::new(1000, usage, 100, 200);
679 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
680 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
681 let checkpointer =
682 Arc::new(Checkpointer::new(CheckpointConfig::default()));
683
684 manager.start_daemons(evictor, cleaner, checkpointer);
685 thread::sleep(Duration::from_millis(30));
686
687 let start = Instant::now();
688 manager.shutdown();
689 let elapsed = start.elapsed();
690
691 assert!(
692 elapsed < Duration::from_secs(2),
693 "CC-3: shutdown deadlocked or stalled: took {:?}",
694 elapsed
695 );
696 assert!(!manager.is_running());
697 }
698}