use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex as ParkingMutex;
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinHandle;
use super::daemon::LifecycleDaemon;
use super::group::LifecycleGroup;
const MAX_BACKOFF_SHIFT: u32 = 8;
const MAX_TRACKED_INDICES: usize = 256;
#[derive(Debug, Default)]
pub struct HealthMonitorStats {
pub ticks: AtomicU64,
pub replacements_initiated: AtomicU64,
pub replacements_failed: AtomicU64,
pub backoff_skips: AtomicU64,
pub consecutive_failures: ParkingMutex<Vec<u32>>,
pub last_tick_at: ParkingMutex<Option<std::time::Instant>>,
}
pub struct HealthMonitor<L: LifecycleDaemon> {
stats: Arc<HealthMonitorStats>,
shutdown: Arc<AtomicBool>,
task: AsyncMutex<Option<JoinHandle<()>>>,
_marker: std::marker::PhantomData<L>,
}
async fn run_poll_pass<L, F>(
group: &mut LifecycleGroup<L>,
factory: &mut F,
stats: &Arc<HealthMonitorStats>,
) -> bool
where
L: LifecycleDaemon,
F: FnMut(u8) -> Arc<L> + Send + 'static,
{
let snapshot = group.health().await;
let mut any_work = false;
{
let mut failures = stats.consecutive_failures.lock();
if failures.len() < snapshot.len() {
failures.resize(snapshot.len().min(MAX_TRACKED_INDICES), 0);
}
}
let current_tick = stats.ticks.load(Ordering::Acquire);
for (idx, h) in snapshot.iter().enumerate() {
if h.healthy {
if idx < MAX_TRACKED_INDICES {
let mut failures = stats.consecutive_failures.lock();
if let Some(slot) = failures.get_mut(idx) {
*slot = 0;
}
}
continue;
}
let failures_before = if idx < MAX_TRACKED_INDICES {
stats
.consecutive_failures
.lock()
.get(idx)
.copied()
.unwrap_or(0)
} else {
0
};
if !should_retry_now(failures_before, current_tick) {
stats.backoff_skips.fetch_add(1, Ordering::AcqRel);
continue;
}
let new_daemon = factory(u8::try_from(idx).unwrap_or(u8::MAX));
stats.replacements_initiated.fetch_add(1, Ordering::AcqRel);
any_work = true;
if let Err(e) = group.replace(idx, new_daemon).await {
stats.replacements_failed.fetch_add(1, Ordering::AcqRel);
tracing::warn!(
error = %e,
replica_index = idx,
"HealthMonitor: replace failed; continuing"
);
}
if idx < MAX_TRACKED_INDICES {
let mut failures = stats.consecutive_failures.lock();
if let Some(slot) = failures.get_mut(idx) {
*slot = slot.saturating_add(1);
}
}
}
any_work
}
fn should_retry_now(failures: u32, current_tick: u64) -> bool {
if failures == 0 {
return true;
}
let shift = failures.min(MAX_BACKOFF_SHIFT);
let step: u64 = 1u64 << shift;
current_tick.is_multiple_of(step)
}
impl<L: LifecycleDaemon> HealthMonitor<L> {
pub fn spawn<F>(
group: Arc<AsyncMutex<Option<LifecycleGroup<L>>>>,
mut factory: F,
interval: Duration,
) -> Self
where
F: FnMut(u8) -> Arc<L> + Send + 'static,
{
let stats = Arc::new(HealthMonitorStats::default());
let shutdown = Arc::new(AtomicBool::new(false));
let stats_for_task = stats.clone();
let shutdown_for_task = shutdown.clone();
let task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
if shutdown_for_task.load(Ordering::Acquire) {
return;
}
ticker.tick().await;
if shutdown_for_task.load(Ordering::Acquire) {
return;
}
{
let mut guard = group.lock().await;
if let Some(lg) = guard.as_mut() {
let _ = run_poll_pass(lg, &mut factory, &stats_for_task).await;
}
}
stats_for_task.ticks.fetch_add(1, Ordering::AcqRel);
*stats_for_task.last_tick_at.lock() = Some(std::time::Instant::now());
}
});
Self {
stats,
shutdown,
task: AsyncMutex::new(Some(task)),
_marker: std::marker::PhantomData,
}
}
pub fn stats(&self) -> &Arc<HealthMonitorStats> {
&self.stats
}
pub async fn stop(&self) {
self.shutdown.store(true, Ordering::Release);
let task = self.task.lock().await.take();
if let Some(t) = task {
let _ = t.await;
}
}
}
#[cfg(test)]
mod tests {
use super::super::daemon::{LifecycleError, ReplicaHealth};
use super::*;
use async_trait::async_trait;
use std::sync::atomic::AtomicBool as StdAtomicBool;
struct ToggleHealthDaemon {
unhealthy: StdAtomicBool,
starts: AtomicU64,
stops: AtomicU64,
}
impl ToggleHealthDaemon {
fn new(start_unhealthy: bool) -> Self {
Self {
unhealthy: StdAtomicBool::new(start_unhealthy),
starts: AtomicU64::new(0),
stops: AtomicU64::new(0),
}
}
}
#[async_trait]
impl LifecycleDaemon for ToggleHealthDaemon {
fn name(&self) -> &str {
"toggle"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
self.starts.fetch_add(1, Ordering::AcqRel);
Ok(())
}
async fn on_stop(&self) {
self.stops.fetch_add(1, Ordering::AcqRel);
}
async fn health(&self) -> ReplicaHealth {
if self.unhealthy.load(Ordering::Acquire) {
ReplicaHealth::unhealthy("toggle-set-unhealthy")
} else {
ReplicaHealth::healthy()
}
}
}
#[tokio::test]
async fn monitor_replaces_an_unhealthy_replica_after_one_poll() {
let original_replicas: Arc<parking_lot::Mutex<Vec<Arc<ToggleHealthDaemon>>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let original_clone = original_replicas.clone();
let group = LifecycleGroup::<ToggleHealthDaemon>::spawn(2, [0u8; 32], move |idx| {
let d = Arc::new(ToggleHealthDaemon::new(idx == 1));
original_clone.lock().push(d.clone());
d
})
.await
.expect("spawn group");
let original_at_1 = original_replicas.lock()[1].clone();
let group = Arc::new(AsyncMutex::new(Some(group)));
let factory_calls: Arc<parking_lot::Mutex<Vec<u8>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let factory_calls_clone = factory_calls.clone();
let monitor = HealthMonitor::spawn(
group.clone(),
move |idx| {
factory_calls_clone.lock().push(idx);
Arc::new(ToggleHealthDaemon::new(false))
},
Duration::from_millis(50),
);
tokio::time::sleep(Duration::from_millis(140)).await;
assert!(
!factory_calls.lock().is_empty(),
"factory should have been called at least once"
);
assert!(
factory_calls.lock().contains(&1),
"factory must have been called for the unhealthy index 1"
);
assert!(
original_at_1.stops.load(Ordering::Acquire) >= 1,
"original index-1 daemon must have been stopped during replace"
);
{
let g = group.lock().await;
let lg = g.as_ref().expect("group not taken");
let now_at_1 = lg.replica(1).expect("replica 1");
assert!(
!Arc::ptr_eq(&now_at_1, &original_at_1),
"replica 1 should be the replacement, not the original"
);
}
assert!(
monitor
.stats()
.replacements_initiated
.load(Ordering::Acquire)
>= 1
);
assert!(monitor.stats().ticks.load(Ordering::Acquire) >= 1);
monitor.stop().await;
let g = Arc::try_unwrap(group)
.map_err(|_| "still referenced")
.expect("only ref")
.into_inner();
if let Some(lg) = g {
lg.stop().await;
}
}
#[tokio::test]
async fn monitor_skips_replace_when_all_healthy() {
let group = LifecycleGroup::<ToggleHealthDaemon>::spawn(2, [0u8; 32], |_idx| {
Arc::new(ToggleHealthDaemon::new(false))
})
.await
.expect("spawn");
let group = Arc::new(AsyncMutex::new(Some(group)));
let factory_calls: Arc<parking_lot::Mutex<u32>> = Arc::new(parking_lot::Mutex::new(0));
let factory_calls_clone = factory_calls.clone();
let monitor = HealthMonitor::spawn(
group.clone(),
move |_idx| {
*factory_calls_clone.lock() += 1;
Arc::new(ToggleHealthDaemon::new(false))
},
Duration::from_millis(30),
);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(*factory_calls.lock(), 0, "factory must not be called");
assert_eq!(
monitor
.stats()
.replacements_initiated
.load(Ordering::Acquire),
0
);
assert!(monitor.stats().ticks.load(Ordering::Acquire) >= 1);
monitor.stop().await;
let g = Arc::try_unwrap(group)
.map_err(|_| "still referenced")
.expect("only ref")
.into_inner();
if let Some(lg) = g {
lg.stop().await;
}
}
#[test]
fn should_retry_now_first_failure_always_retries() {
for tick in 0..20u64 {
assert!(should_retry_now(0, tick), "tick {tick} with 0 failures");
}
}
#[test]
fn should_retry_now_backoff_grows_exponentially() {
let retries_with_1: Vec<u64> = (0..16).filter(|t| should_retry_now(1, *t)).collect();
assert_eq!(retries_with_1, vec![0, 2, 4, 6, 8, 10, 12, 14]);
let retries_with_2: Vec<u64> = (0..16).filter(|t| should_retry_now(2, *t)).collect();
assert_eq!(retries_with_2, vec![0, 4, 8, 12]);
let retries_with_3: Vec<u64> = (0..16).filter(|t| should_retry_now(3, *t)).collect();
assert_eq!(retries_with_3, vec![0, 8]);
}
#[test]
fn should_retry_now_caps_at_max_backoff_shift() {
let max_step: u64 = 1u64 << MAX_BACKOFF_SHIFT;
assert!(should_retry_now(100, max_step));
assert!(!should_retry_now(100, max_step + 1));
assert!(should_retry_now(u32::MAX, max_step));
}
struct PerpetuallyUnhealthyDaemon {
starts: AtomicU64,
stops: AtomicU64,
}
#[async_trait]
impl LifecycleDaemon for PerpetuallyUnhealthyDaemon {
fn name(&self) -> &str {
"perpetually-unhealthy"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
self.starts.fetch_add(1, Ordering::AcqRel);
Ok(())
}
async fn on_stop(&self) {
self.stops.fetch_add(1, Ordering::AcqRel);
}
async fn health(&self) -> ReplicaHealth {
ReplicaHealth::unhealthy("never-recovers")
}
}
#[tokio::test]
async fn monitor_backoff_throttles_replaces_after_consecutive_failures() {
let group = LifecycleGroup::<PerpetuallyUnhealthyDaemon>::spawn(1, [0u8; 32], |_idx| {
Arc::new(PerpetuallyUnhealthyDaemon {
starts: AtomicU64::new(0),
stops: AtomicU64::new(0),
})
})
.await
.expect("spawn");
let group = Arc::new(AsyncMutex::new(Some(group)));
let monitor = HealthMonitor::spawn(
group.clone(),
|_idx| {
Arc::new(PerpetuallyUnhealthyDaemon {
starts: AtomicU64::new(0),
stops: AtomicU64::new(0),
})
},
Duration::from_millis(15),
);
tokio::time::sleep(Duration::from_millis(300)).await;
let initiated = monitor
.stats()
.replacements_initiated
.load(Ordering::Acquire);
let skips = monitor.stats().backoff_skips.load(Ordering::Acquire);
assert!(
initiated <= 12,
"without backoff this would be 20+; got {initiated}"
);
assert!(
skips >= 3,
"expected backoff to skip at least 3 ticks; got {skips}"
);
monitor.stop().await;
let g = Arc::try_unwrap(group)
.map_err(|_| "still referenced")
.expect("only ref")
.into_inner();
if let Some(lg) = g {
lg.stop().await;
}
}
}