1use crate::engine_config::EngineConfig;
4use noxu_cleaner::Cleaner;
5use noxu_evictor::{EvictionSource, Evictor};
6use noxu_recovery::Checkpointer;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread::{self, JoinHandle};
10use std::time::Duration;
11
12#[derive(Clone)]
18struct WakeHandle {
19 pair: Arc<(Mutex<bool>, Condvar)>,
20}
21
22impl WakeHandle {
23 fn new() -> Self {
24 Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
25 }
26
27 fn wait_timeout(&self, duration: Duration) -> bool {
32 let (lock, cvar) = &*self.pair;
33 let guard = lock.lock().unwrap();
34 let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
35 *guard
36 }
37
38 fn notify(&self) {
40 let (lock, cvar) = &*self.pair;
41 *lock.lock().unwrap() = true;
42 cvar.notify_all();
43 }
44}
45
46pub struct DaemonManager {
57 shutdown: Arc<AtomicBool>,
59
60 evictor_wake: WakeHandle,
62 cleaner_wake: WakeHandle,
63 checkpointer_wake: WakeHandle,
64
65 evictor_handle: Option<JoinHandle<()>>,
67
68 cleaner_handle: Option<JoinHandle<()>>,
70
71 checkpointer_handle: Option<JoinHandle<()>>,
73
74 evictor_enabled: bool,
76
77 cleaner_enabled: bool,
79
80 checkpointer_enabled: bool,
82
83 evictor_wakeup_ms: u64,
85
86 cleaner_wakeup_ms: u64,
88
89 checkpointer_wakeup_ms: u64,
91}
92
93impl DaemonManager {
94 pub fn new(config: &EngineConfig) -> Self {
98 Self {
99 shutdown: Arc::new(AtomicBool::new(false)),
100 evictor_wake: WakeHandle::new(),
101 cleaner_wake: WakeHandle::new(),
102 checkpointer_wake: WakeHandle::new(),
103 evictor_handle: None,
104 cleaner_handle: None,
105 checkpointer_handle: None,
106 evictor_enabled: config.evictor_enabled,
107 cleaner_enabled: config.cleaner_enabled,
108 checkpointer_enabled: config.checkpointer_enabled,
109 evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
110 cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
111 checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
112 }
113 }
114
115 pub fn start_daemons(
128 &mut self,
129 evictor: Arc<Evictor>,
130 cleaner: Arc<Cleaner>,
131 checkpointer: Arc<Checkpointer>,
132 ) {
133 if self.evictor_enabled {
135 let shutdown = Arc::clone(&self.shutdown);
136 let wakeup_ms = self.evictor_wakeup_ms;
137 let evictor = Arc::clone(&evictor);
138 let wake = self.evictor_wake.clone();
139
140 self.evictor_handle = Some(thread::spawn(move || {
141 log::info!("Evictor daemon started");
142 while !shutdown.load(Ordering::Relaxed) {
143 let notified =
145 wake.wait_timeout(Duration::from_millis(wakeup_ms));
146 if notified || shutdown.load(Ordering::Relaxed) {
147 break;
148 }
149
150 let result = evictor.do_evict(EvictionSource::Daemon);
152 if result.nodes_evicted > 0 {
153 log::debug!(
154 "Evictor: evicted {} nodes, {} bytes",
155 result.nodes_evicted,
156 result.bytes_evicted
157 );
158 }
159 }
160 log::info!("Evictor daemon stopped");
161 }));
162 }
163
164 if self.cleaner_enabled {
166 let shutdown = Arc::clone(&self.shutdown);
167 let wakeup_ms = self.cleaner_wakeup_ms;
168 let cleaner = Arc::clone(&cleaner);
169 let wake = self.cleaner_wake.clone();
170
171 self.cleaner_handle = Some(thread::spawn(move || {
172 log::info!("Cleaner daemon started");
173 while !shutdown.load(Ordering::Relaxed) {
174 let notified =
176 wake.wait_timeout(Duration::from_millis(wakeup_ms));
177 if notified || shutdown.load(Ordering::Relaxed) {
178 break;
179 }
180
181 match cleaner.do_clean(1, false) {
183 Ok(result) => {
184 if result.files_cleaned > 0 {
185 log::debug!(
186 "Cleaner: cleaned {} files, deleted {} files",
187 result.files_cleaned,
188 result.files_deleted
189 );
190 }
191 }
192 Err(e) => {
193 log::warn!("Cleaner error: {}", e);
194 }
195 }
196 }
197 log::info!("Cleaner daemon stopped");
198 }));
199 }
200
201 if self.checkpointer_enabled {
203 let shutdown = Arc::clone(&self.shutdown);
204 let wakeup_ms = self.checkpointer_wakeup_ms;
205 let checkpointer = Arc::clone(&checkpointer);
206 let wake = self.checkpointer_wake.clone();
207
208 self.checkpointer_handle = Some(thread::spawn(move || {
209 log::info!("Checkpointer daemon started");
210 while !shutdown.load(Ordering::Relaxed) {
211 let notified =
213 wake.wait_timeout(Duration::from_millis(wakeup_ms));
214 if notified || shutdown.load(Ordering::Relaxed) {
215 break;
216 }
217
218 match checkpointer.do_checkpoint("daemon") {
220 Ok(result) => {
221 log::debug!(
222 "Checkpoint: id={}, flushed {} nodes",
223 result.checkpoint_id,
224 result.total_nodes_flushed()
225 );
226 }
227 Err(e) => {
228 log::warn!("Checkpoint error: {}", e);
229 }
230 }
231 }
232 log::info!("Checkpointer daemon stopped");
233 }));
234 }
235 }
236
237 pub fn shutdown(&mut self) {
245 self.shutdown.store(true, Ordering::Relaxed);
247
248 self.evictor_wake.notify();
251 self.cleaner_wake.notify();
252 self.checkpointer_wake.notify();
253
254 if let Some(handle) = self.evictor_handle.take()
256 && let Err(e) = handle.join()
257 {
258 log::error!("Failed to join evictor thread: {:?}", e);
259 }
260
261 if let Some(handle) = self.cleaner_handle.take()
263 && let Err(e) = handle.join()
264 {
265 log::error!("Failed to join cleaner thread: {:?}", e);
266 }
267
268 if let Some(handle) = self.checkpointer_handle.take()
270 && let Err(e) = handle.join()
271 {
272 log::error!("Failed to join checkpointer thread: {:?}", e);
273 }
274 }
275
276 pub fn is_running(&self) -> bool {
289 !self.shutdown.load(Ordering::Relaxed)
293 }
294
295 pub fn running_count(&self) -> usize {
297 let mut count = 0;
298 if self.evictor_enabled && self.evictor_handle.is_some() {
299 count += 1;
300 }
301 if self.cleaner_enabled && self.cleaner_handle.is_some() {
302 count += 1;
303 }
304 if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
305 count += 1;
306 }
307 count
308 }
309}
310
311impl Drop for DaemonManager {
312 fn drop(&mut self) {
313 if self.is_running() {
315 self.shutdown();
316 }
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use noxu_evictor::Arbiter;
324 use noxu_recovery::CheckpointConfig;
325 use std::sync::atomic::AtomicI64;
326
327 #[test]
328 fn test_daemon_manager_creation() {
329 let config = EngineConfig::default();
330 let manager = DaemonManager::new(&config);
331
332 assert!(manager.evictor_enabled);
333 assert!(manager.cleaner_enabled);
334 assert!(manager.checkpointer_enabled);
335 assert!(manager.is_running());
336 assert_eq!(manager.running_count(), 0); }
338
339 #[test]
340 fn test_daemon_manager_with_disabled_daemons() {
341 let config = EngineConfig::default()
342 .evictor_enabled(false)
343 .cleaner_enabled(false)
344 .checkpointer_enabled(false);
345 let manager = DaemonManager::new(&config);
346
347 assert!(!manager.evictor_enabled);
348 assert!(!manager.cleaner_enabled);
349 assert!(!manager.checkpointer_enabled);
350 }
351
352 #[test]
353 fn test_daemon_manager_start_and_shutdown() {
354 let config = EngineConfig::default()
355 .evictor_wakeup_interval_ms(100)
356 .cleaner_wakeup_interval_ms(100)
357 .checkpointer_wakeup_interval_ms(100);
358
359 let mut manager = DaemonManager::new(&config);
360
361 let usage = Arc::new(AtomicI64::new(500));
363 let arbiter = Arbiter::new(1000, usage, 100, 200);
364 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
365 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
366 let checkpointer =
367 Arc::new(Checkpointer::new(CheckpointConfig::default()));
368
369 manager.start_daemons(evictor, cleaner, checkpointer);
371
372 thread::sleep(Duration::from_millis(50));
374 assert!(manager.is_running());
375 assert_eq!(manager.running_count(), 3);
376
377 manager.shutdown();
379 assert!(!manager.is_running());
380 }
381
382 #[test]
383 fn test_daemon_manager_selective_daemons() {
384 let config = EngineConfig::default()
385 .evictor_enabled(true)
386 .cleaner_enabled(false)
387 .checkpointer_enabled(true)
388 .evictor_wakeup_interval_ms(100)
389 .checkpointer_wakeup_interval_ms(100);
390
391 let mut manager = DaemonManager::new(&config);
392
393 let usage = Arc::new(AtomicI64::new(500));
394 let arbiter = Arbiter::new(1000, usage, 100, 200);
395 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
396 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
397 let checkpointer =
398 Arc::new(Checkpointer::new(CheckpointConfig::default()));
399
400 manager.start_daemons(evictor, cleaner, checkpointer);
401
402 thread::sleep(Duration::from_millis(50));
403 assert_eq!(manager.running_count(), 2); manager.shutdown();
406 }
407
408 #[test]
409 fn test_daemon_manager_drop_cleanup() {
410 let config = EngineConfig::default()
411 .evictor_wakeup_interval_ms(100)
412 .cleaner_wakeup_interval_ms(100)
413 .checkpointer_wakeup_interval_ms(100);
414
415 let mut manager = DaemonManager::new(&config);
416
417 let usage = Arc::new(AtomicI64::new(500));
418 let arbiter = Arbiter::new(1000, usage, 100, 200);
419 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
420 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
421 let checkpointer =
422 Arc::new(Checkpointer::new(CheckpointConfig::default()));
423
424 manager.start_daemons(evictor, cleaner, checkpointer);
425
426 thread::sleep(Duration::from_millis(50));
427 assert!(manager.is_running());
428
429 drop(manager);
431 }
432
433 #[test]
434 fn test_daemon_wakeup_intervals() {
435 let config = EngineConfig::default()
436 .evictor_wakeup_interval_ms(1000)
437 .cleaner_wakeup_interval_ms(2000)
438 .checkpointer_wakeup_interval_ms(3000);
439
440 let manager = DaemonManager::new(&config);
441 assert_eq!(manager.evictor_wakeup_ms, 1000);
442 assert_eq!(manager.cleaner_wakeup_ms, 2000);
443 assert_eq!(manager.checkpointer_wakeup_ms, 3000);
444 }
445
446 #[test]
450 fn test_shutdown_wakes_daemons_early() {
451 use std::time::Instant;
452
453 let config = EngineConfig::default()
455 .evictor_wakeup_interval_ms(5000)
456 .cleaner_wakeup_interval_ms(5000)
457 .checkpointer_wakeup_interval_ms(5000);
458
459 let mut manager = DaemonManager::new(&config);
460
461 let usage = Arc::new(AtomicI64::new(500));
462 let arbiter = Arbiter::new(1000, usage, 100, 200);
463 let evictor = Arc::new(Evictor::new(arbiter, 100, false));
464 let cleaner = Arc::new(Cleaner::new(50, 5, 0));
465 let checkpointer =
466 Arc::new(Checkpointer::new(CheckpointConfig::default()));
467
468 manager.start_daemons(evictor, cleaner, checkpointer);
469
470 thread::sleep(Duration::from_millis(50));
472
473 let start = Instant::now();
474 manager.shutdown();
475 let elapsed = start.elapsed();
476
477 assert!(
479 elapsed < Duration::from_secs(1),
480 "shutdown took {:?}, expected < 1s",
481 elapsed
482 );
483 }
484
485 #[test]
486 fn test_wake_handle_timeout() {
487 let handle = WakeHandle::new();
488
489 let notified = handle.wait_timeout(Duration::from_millis(50));
491 assert!(!notified);
492 }
493
494 #[test]
495 fn test_wake_handle_notify() {
496 use std::time::Instant;
497
498 let handle = WakeHandle::new();
499 let handle2 = handle.clone();
500
501 let t = thread::spawn(move || {
503 thread::sleep(Duration::from_millis(20));
504 handle2.notify();
505 });
506
507 let start = Instant::now();
508 let notified = handle.wait_timeout(Duration::from_secs(5));
510 let elapsed = start.elapsed();
511
512 t.join().unwrap();
513
514 assert!(notified, "expected notify to return true");
515 assert!(
516 elapsed < Duration::from_millis(500),
517 "took {:?}, expected wakeup within 500ms",
518 elapsed
519 );
520 }
521}