use crate::engine_config::EngineConfig;
use noxu_cleaner::Cleaner;
use noxu_evictor::{EvictionSource, Evictor};
use noxu_recovery::Checkpointer;
use noxu_util::dst_sync::atomic::{AtomicBool, Ordering};
use noxu_util::dst_sync::{Arc, Condvar, Mutex, thread};
use std::time::Duration;
#[derive(Clone)]
#[doc(hidden)]
pub struct WakeHandle {
pair: Arc<(Mutex<bool>, Condvar)>,
}
impl WakeHandle {
pub(crate) fn new() -> Self {
Self { pair: Arc::new((Mutex::new(false), Condvar::new())) }
}
#[doc(hidden)]
pub fn wait_timeout(&self, duration: Duration) -> bool {
let (lock, cvar) = &*self.pair;
let guard = lock.lock().unwrap();
if *guard {
return true;
}
let (guard, _) = cvar.wait_timeout(guard, duration).unwrap();
*guard
}
#[doc(hidden)]
pub fn notify(&self) {
let (lock, cvar) = &*self.pair;
*lock.lock().unwrap() = true;
cvar.notify_all();
}
}
#[cfg(noxu_shuttle)]
impl WakeHandle {
#[doc(hidden)]
pub fn new_for_shuttle() -> Self {
Self::new()
}
}
pub struct DaemonManager {
shutdown: Arc<AtomicBool>,
evictor_wake: WakeHandle,
cleaner_wake: WakeHandle,
checkpointer_wake: WakeHandle,
evictor_handle: Option<thread::JoinHandle<()>>,
cleaner_handle: Option<thread::JoinHandle<()>>,
checkpointer_handle: Option<thread::JoinHandle<()>>,
evictor_enabled: bool,
cleaner_enabled: bool,
checkpointer_enabled: bool,
evictor_wakeup_ms: u64,
cleaner_wakeup_ms: u64,
checkpointer_wakeup_ms: u64,
}
impl DaemonManager {
pub fn new(config: &EngineConfig) -> Self {
Self {
shutdown: Arc::new(AtomicBool::new(false)),
evictor_wake: WakeHandle::new(),
cleaner_wake: WakeHandle::new(),
checkpointer_wake: WakeHandle::new(),
evictor_handle: None,
cleaner_handle: None,
checkpointer_handle: None,
evictor_enabled: config.evictor_enabled,
cleaner_enabled: config.cleaner_enabled,
checkpointer_enabled: config.checkpointer_enabled,
evictor_wakeup_ms: config.evictor_wakeup_interval_ms,
cleaner_wakeup_ms: config.cleaner_wakeup_interval_ms,
checkpointer_wakeup_ms: config.checkpointer_wakeup_interval_ms,
}
}
pub fn start_daemons(
&mut self,
evictor: Arc<Evictor>,
cleaner: Arc<Cleaner>,
checkpointer: Arc<Checkpointer>,
) {
if self.evictor_enabled {
let shutdown = Arc::clone(&self.shutdown);
let wakeup_ms = self.evictor_wakeup_ms;
let evictor = Arc::clone(&evictor);
let wake = self.evictor_wake.clone();
self.evictor_handle = Some(thread::spawn(move || {
log::info!("Evictor daemon started");
while !shutdown.load(Ordering::Relaxed) {
let notified =
wake.wait_timeout(Duration::from_millis(wakeup_ms));
if notified || shutdown.load(Ordering::Relaxed) {
break;
}
let result = evictor.do_evict(EvictionSource::Daemon);
if result.nodes_evicted > 0 {
log::debug!(
"Evictor: evicted {} nodes, {} bytes",
result.nodes_evicted,
result.bytes_evicted
);
}
}
log::info!("Evictor daemon stopped");
}));
}
if self.cleaner_enabled {
let shutdown = Arc::clone(&self.shutdown);
let wakeup_ms = self.cleaner_wakeup_ms;
let cleaner = Arc::clone(&cleaner);
let wake = self.cleaner_wake.clone();
self.cleaner_handle = Some(thread::spawn(move || {
log::info!("Cleaner daemon started");
while !shutdown.load(Ordering::Relaxed) {
let notified =
wake.wait_timeout(Duration::from_millis(wakeup_ms));
if notified || shutdown.load(Ordering::Relaxed) {
break;
}
match cleaner.do_clean(1, false) {
Ok(result) => {
if result.files_cleaned > 0 {
log::debug!(
"Cleaner: cleaned {} files, deleted {} files",
result.files_cleaned,
result.files_deleted
);
}
}
Err(e) => {
log::warn!("Cleaner error: {}", e);
}
}
}
log::info!("Cleaner daemon stopped");
}));
}
if self.checkpointer_enabled {
let shutdown = Arc::clone(&self.shutdown);
let wakeup_ms = self.checkpointer_wakeup_ms;
let checkpointer = Arc::clone(&checkpointer);
let wake = self.checkpointer_wake.clone();
self.checkpointer_handle = Some(thread::spawn(move || {
log::info!("Checkpointer daemon started");
while !shutdown.load(Ordering::Relaxed) {
let notified =
wake.wait_timeout(Duration::from_millis(wakeup_ms));
if notified || shutdown.load(Ordering::Relaxed) {
break;
}
if !checkpointer.is_runnable(false) {
continue;
}
match checkpointer.do_checkpoint("daemon") {
Ok(result) => {
log::debug!(
"Checkpoint: id={}, flushed {} nodes",
result.checkpoint_id,
result.total_nodes_flushed()
);
}
Err(e) => {
log::warn!("Checkpoint error: {}", e);
}
}
}
log::info!("Checkpointer daemon stopped");
}));
}
}
pub fn shutdown(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
self.cleaner_wake.notify();
self.checkpointer_wake.notify();
self.evictor_wake.notify();
if let Some(handle) = self.cleaner_handle.take()
&& let Err(e) = handle.join()
{
log::error!("Failed to join cleaner thread: {:?}", e);
}
if let Some(handle) = self.checkpointer_handle.take()
&& let Err(e) = handle.join()
{
log::error!("Failed to join checkpointer thread: {:?}", e);
}
if let Some(handle) = self.evictor_handle.take()
&& let Err(e) = handle.join()
{
log::error!("Failed to join evictor thread: {:?}", e);
}
}
pub fn is_running(&self) -> bool {
!self.shutdown.load(Ordering::Relaxed)
}
pub fn running_count(&self) -> usize {
let mut count = 0;
if self.evictor_enabled && self.evictor_handle.is_some() {
count += 1;
}
if self.cleaner_enabled && self.cleaner_handle.is_some() {
count += 1;
}
if self.checkpointer_enabled && self.checkpointer_handle.is_some() {
count += 1;
}
count
}
}
impl Drop for DaemonManager {
fn drop(&mut self) {
if self.is_running() {
self.shutdown();
}
}
}
#[cfg(noxu_shuttle)]
pub mod dst_hooks {
pub use super::WakeHandle;
}
#[cfg(test)]
mod tests {
use super::*;
use noxu_evictor::Arbiter;
use noxu_recovery::CheckpointConfig;
use std::sync::atomic::AtomicI64;
#[test]
fn test_daemon_manager_creation() {
let config = EngineConfig::default();
let manager = DaemonManager::new(&config);
assert!(manager.evictor_enabled);
assert!(manager.cleaner_enabled);
assert!(manager.checkpointer_enabled);
assert!(manager.is_running());
assert_eq!(manager.running_count(), 0); }
#[test]
fn test_daemon_manager_with_disabled_daemons() {
let config = EngineConfig::default()
.evictor_enabled(false)
.cleaner_enabled(false)
.checkpointer_enabled(false);
let manager = DaemonManager::new(&config);
assert!(!manager.evictor_enabled);
assert!(!manager.cleaner_enabled);
assert!(!manager.checkpointer_enabled);
}
#[test]
fn test_daemon_manager_start_and_shutdown() {
let config = EngineConfig::default()
.evictor_wakeup_interval_ms(100)
.cleaner_wakeup_interval_ms(100)
.checkpointer_wakeup_interval_ms(100);
let mut manager = DaemonManager::new(&config);
let usage = Arc::new(AtomicI64::new(500));
let arbiter = Arbiter::new(1000, usage, 100, 200);
let evictor = Arc::new(Evictor::new(arbiter, 100, false));
let cleaner = Arc::new(Cleaner::new(50, 5, 0));
let checkpointer =
Arc::new(Checkpointer::new(CheckpointConfig::default()));
manager.start_daemons(evictor, cleaner, checkpointer);
thread::sleep(Duration::from_millis(50));
assert!(manager.is_running());
assert_eq!(manager.running_count(), 3);
manager.shutdown();
assert!(!manager.is_running());
}
#[test]
fn test_daemon_manager_selective_daemons() {
let config = EngineConfig::default()
.evictor_enabled(true)
.cleaner_enabled(false)
.checkpointer_enabled(true)
.evictor_wakeup_interval_ms(100)
.checkpointer_wakeup_interval_ms(100);
let mut manager = DaemonManager::new(&config);
let usage = Arc::new(AtomicI64::new(500));
let arbiter = Arbiter::new(1000, usage, 100, 200);
let evictor = Arc::new(Evictor::new(arbiter, 100, false));
let cleaner = Arc::new(Cleaner::new(50, 5, 0));
let checkpointer =
Arc::new(Checkpointer::new(CheckpointConfig::default()));
manager.start_daemons(evictor, cleaner, checkpointer);
thread::sleep(Duration::from_millis(50));
assert_eq!(manager.running_count(), 2);
manager.shutdown();
}
#[test]
fn test_daemon_manager_drop_cleanup() {
let config = EngineConfig::default()
.evictor_wakeup_interval_ms(100)
.cleaner_wakeup_interval_ms(100)
.checkpointer_wakeup_interval_ms(100);
let mut manager = DaemonManager::new(&config);
let usage = Arc::new(AtomicI64::new(500));
let arbiter = Arbiter::new(1000, usage, 100, 200);
let evictor = Arc::new(Evictor::new(arbiter, 100, false));
let cleaner = Arc::new(Cleaner::new(50, 5, 0));
let checkpointer =
Arc::new(Checkpointer::new(CheckpointConfig::default()));
manager.start_daemons(evictor, cleaner, checkpointer);
thread::sleep(Duration::from_millis(50));
assert!(manager.is_running());
drop(manager);
}
#[test]
fn test_daemon_wakeup_intervals() {
let config = EngineConfig::default()
.evictor_wakeup_interval_ms(1000)
.cleaner_wakeup_interval_ms(2000)
.checkpointer_wakeup_interval_ms(3000);
let manager = DaemonManager::new(&config);
assert_eq!(manager.evictor_wakeup_ms, 1000);
assert_eq!(manager.cleaner_wakeup_ms, 2000);
assert_eq!(manager.checkpointer_wakeup_ms, 3000);
}
#[test]
fn test_shutdown_wakes_daemons_early() {
use std::time::Instant;
let config = EngineConfig::default()
.evictor_wakeup_interval_ms(5000)
.cleaner_wakeup_interval_ms(5000)
.checkpointer_wakeup_interval_ms(5000);
let mut manager = DaemonManager::new(&config);
let usage = Arc::new(AtomicI64::new(500));
let arbiter = Arbiter::new(1000, usage, 100, 200);
let evictor = Arc::new(Evictor::new(arbiter, 100, false));
let cleaner = Arc::new(Cleaner::new(50, 5, 0));
let checkpointer =
Arc::new(Checkpointer::new(CheckpointConfig::default()));
manager.start_daemons(evictor, cleaner, checkpointer);
thread::sleep(Duration::from_millis(50));
let start = Instant::now();
manager.shutdown();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(1),
"shutdown took {:?}, expected < 1s",
elapsed
);
}
#[test]
fn test_wake_handle_timeout() {
let handle = WakeHandle::new();
let notified = handle.wait_timeout(Duration::from_millis(50));
assert!(!notified);
}
#[test]
fn test_wake_handle_notify() {
use std::time::Instant;
let handle = WakeHandle::new();
let handle2 = handle.clone();
let t = thread::spawn(move || {
thread::sleep(Duration::from_millis(20));
handle2.notify();
});
let start = Instant::now();
let notified = handle.wait_timeout(Duration::from_secs(5));
let elapsed = start.elapsed();
t.join().unwrap();
assert!(notified, "expected notify to return true");
assert!(
elapsed < Duration::from_millis(500),
"took {:?}, expected wakeup within 500ms",
elapsed
);
}
#[test]
fn test_cc3_shutdown_order_cleaner_checkpointer_evictor() {
use std::sync::Mutex;
use std::time::Instant;
let join_seq: Arc<Mutex<Vec<&'static str>>> =
Arc::new(Mutex::new(Vec::new()));
let shutdown_flag = Arc::new(AtomicBool::new(false));
let cleaner_joined =
Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let checkpointer_joined =
Arc::new((Mutex::new(false), std::sync::Condvar::new()));
let wake_c = WakeHandle::new();
let wake_cp = WakeHandle::new();
let wake_ev = WakeHandle::new();
let sd_c = shutdown_flag.clone();
let wake_c2 = wake_c.clone();
let cleaner_t = thread::spawn(move || {
while !sd_c.load(Ordering::Relaxed) {
wake_c2.wait_timeout(Duration::from_millis(5000));
}
});
let sd_cp = shutdown_flag.clone();
let wake_cp2 = wake_cp.clone();
let cj = cleaner_joined.clone();
let checkpointer_t = thread::spawn(move || {
while !sd_cp.load(Ordering::Relaxed) {
wake_cp2.wait_timeout(Duration::from_millis(5000));
}
let (lock, cv) = &*cj;
let mut g = lock.lock().unwrap();
while !*g {
g = cv.wait(g).unwrap();
}
});
let sd_ev = shutdown_flag.clone();
let wake_ev2 = wake_ev.clone();
let cpj = checkpointer_joined.clone();
let evictor_t = thread::spawn(move || {
while !sd_ev.load(Ordering::Relaxed) {
wake_ev2.wait_timeout(Duration::from_millis(5000));
}
let (lock, cv) = &*cpj;
let mut g = lock.lock().unwrap();
while !*g {
g = cv.wait(g).unwrap();
}
});
shutdown_flag.store(true, Ordering::Relaxed);
wake_c.notify();
wake_cp.notify();
wake_ev.notify();
let start = Instant::now();
cleaner_t.join().unwrap();
join_seq.lock().unwrap().push("cleaner");
{
let (l, cv) = &*cleaner_joined;
*l.lock().unwrap() = true;
cv.notify_all();
}
checkpointer_t.join().unwrap();
join_seq.lock().unwrap().push("checkpointer");
{
let (l, cv) = &*checkpointer_joined;
*l.lock().unwrap() = true;
cv.notify_all();
}
evictor_t.join().unwrap();
join_seq.lock().unwrap().push("evictor");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"CC-3: shutdown stalled: {:?}",
elapsed
);
let order = join_seq.lock().unwrap();
assert_eq!(
*order,
vec!["cleaner", "checkpointer", "evictor"],
"CC-3: join order must be cleaner→checkpointer→evictor (JE order)"
);
}
#[test]
fn test_cc3_shutdown_no_deadlock_bounded_time() {
use std::time::Instant;
let config = EngineConfig::default()
.evictor_wakeup_interval_ms(10_000)
.cleaner_wakeup_interval_ms(10_000)
.checkpointer_wakeup_interval_ms(10_000);
let mut manager = DaemonManager::new(&config);
let usage = Arc::new(AtomicI64::new(500));
let arbiter = Arbiter::new(1000, usage, 100, 200);
let evictor = Arc::new(Evictor::new(arbiter, 100, false));
let cleaner = Arc::new(Cleaner::new(50, 5, 0));
let checkpointer =
Arc::new(Checkpointer::new(CheckpointConfig::default()));
manager.start_daemons(evictor, cleaner, checkpointer);
thread::sleep(Duration::from_millis(30));
let start = Instant::now();
manager.shutdown();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"CC-3: shutdown deadlocked or stalled: took {:?}",
elapsed
);
assert!(!manager.is_running());
}
}