net/adapter/net/behavior/lifecycle/monitor.rs
1//! [`HealthMonitor`] — background driver that polls a
2//! [`LifecycleGroup`]'s per-replica health and respawns
3//! unhealthy replicas via an operator-supplied factory.
4//!
5//! Direction B / step 4b of
6//! `docs/plans/AGGREGATOR_LIFECYCLE_DEFERRED_2026_05_23.md`.
7//! Built as a sibling to `LifecycleGroup` rather than baked
8//! into the group itself so:
9//!
10//! - Groups that don't want auto-respawn (single-process tests,
11//! purely-snapshot CLI inspection) pay no overhead.
12//! - The monitor's factory + state lives outside the group, so
13//! the group's lifetime + the monitor's lifetime are
14//! independent (the monitor can be stopped while the group
15//! stays running, or vice versa).
16//!
17//! # Threading
18//!
19//! The monitor takes an `Arc<tokio::sync::Mutex<LifecycleGroup<L>>>`
20//! — async-mutex is required because the monitor's poll +
21//! replace path holds the lock across `.await` points. Operators
22//! who never auto-respawn keep their `LifecycleGroup` un-wrapped;
23//! switching to managed mode is a one-line wrap.
24//!
25//! # What's NOT in this slice
26//!
27//! - **Registry integration.** The `AggregatorRegistry` stores
28//! replicas + handles separately (not as a `LifecycleGroup`),
29//! so wiring auto-respawn into registry-managed groups needs
30//! a small registry refactor — tracked as a follow-up.
31//! - **Backoff on repeated failure.** If a replica keeps going
32//! unhealthy, the monitor keeps replacing it on every tick.
33//! Operators can read `HealthMonitorStats::replacements_failed`
34//! to detect persistent failures and shut the monitor down.
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Duration;
39
40use parking_lot::Mutex as ParkingMutex;
41use tokio::sync::Mutex as AsyncMutex;
42use tokio::task::JoinHandle;
43
44use super::daemon::LifecycleDaemon;
45use super::group::LifecycleGroup;
46
47/// Maximum number of ticks the backoff can skip between
48/// replace attempts. With a 1 s monitor interval that's
49/// ~256 s of cooldown after enough consecutive failures —
50/// long enough that a persistently broken replica stops
51/// churning the registry; short enough that recovery is
52/// observable within minutes of the underlying fix landing.
53const MAX_BACKOFF_SHIFT: u32 = 8;
54
55/// Number of `consecutive_failures` slots beyond which we
56/// stop growing the per-index Vec. Replica indices are u8,
57/// so 256 caps the worst case.
58const MAX_TRACKED_INDICES: usize = 256;
59
60/// Runtime counters surfaced to operator tooling. Atomic
61/// fields so reads from CLI / Deck panels are wait-free.
62#[derive(Debug, Default)]
63pub struct HealthMonitorStats {
64 /// Number of poll ticks the monitor has completed since
65 /// `spawn`. Increments after each pass over the group's
66 /// replicas, regardless of how many were unhealthy.
67 pub ticks: AtomicU64,
68 /// Number of `LifecycleGroup::replace` calls initiated.
69 pub replacements_initiated: AtomicU64,
70 /// Number of replace attempts that failed (factory
71 /// returned a daemon whose `on_start` errored, or the slot
72 /// index went out of bounds mid-respawn). Operators
73 /// detecting persistent failures should consult this.
74 pub replacements_failed: AtomicU64,
75 /// Number of poll ticks where the monitor skipped a known-
76 /// unhealthy replica because exponential backoff hadn't
77 /// elapsed yet. Counts every (tick × skipped-replica)
78 /// combination; operators reading the per-index backoff
79 /// state should consult `consecutive_failures`.
80 pub backoff_skips: AtomicU64,
81 /// Per-replica-index consecutive-failure counter. Bumped
82 /// each tick the replica is still unhealthy after a
83 /// replace attempt; reset to 0 when the replica reports
84 /// healthy. The `2^consecutive_failures` shift drives the
85 /// next-retry-tick computation (capped at
86 /// `MAX_BACKOFF_SHIFT`).
87 pub consecutive_failures: ParkingMutex<Vec<u32>>,
88 /// `Instant` of the most recent poll tick, recorded after
89 /// each pass. `None` until the first tick lands.
90 pub last_tick_at: ParkingMutex<Option<std::time::Instant>>,
91}
92
93/// Background driver for [`LifecycleGroup`] auto-respawn.
94/// Construct via [`HealthMonitor::spawn`]; stop via
95/// [`HealthMonitor::stop`].
96pub struct HealthMonitor<L: LifecycleDaemon> {
97 stats: Arc<HealthMonitorStats>,
98 shutdown: Arc<AtomicBool>,
99 task: AsyncMutex<Option<JoinHandle<()>>>,
100 /// Held for type inference + the `_marker` field that lets
101 /// the monitor outlive the spawning function without
102 /// dangling. The actual group reference is captured by the
103 /// spawned task.
104 _marker: std::marker::PhantomData<L>,
105}
106
107/// One health-poll + respawn pass against a live group.
108///
109/// Honors the per-index exponential backoff stored in
110/// `stats.consecutive_failures`. A replica that's been
111/// unhealthy for N consecutive ticks gets retried at
112/// `2^min(N, MAX_BACKOFF_SHIFT)` tick intervals — so a
113/// persistently broken replica stops churning the registry
114/// + the `LifecycleGroup::replace` lock at every tick.
115///
116/// Backoff state transitions:
117/// - Replica reports healthy → reset to 0.
118/// - Replica reports unhealthy + retry due → attempt replace,
119/// then bump counter (the replacement is judged on the next
120/// tick).
121/// - Replica reports unhealthy + retry not due → record a
122/// skip in `stats.backoff_skips` and continue.
123async fn run_poll_pass<L, F>(
124 group: &mut LifecycleGroup<L>,
125 factory: &mut F,
126 stats: &Arc<HealthMonitorStats>,
127) -> bool
128where
129 L: LifecycleDaemon,
130 F: FnMut(u8) -> Arc<L> + Send + 'static,
131{
132 let snapshot = group.health().await;
133 let mut any_work = false;
134 // Grow the per-index failure Vec to cover this group.
135 {
136 let mut failures = stats.consecutive_failures.lock();
137 if failures.len() < snapshot.len() {
138 failures.resize(snapshot.len().min(MAX_TRACKED_INDICES), 0);
139 }
140 }
141 let current_tick = stats.ticks.load(Ordering::Acquire);
142 for (idx, h) in snapshot.iter().enumerate() {
143 if h.healthy {
144 // Healthy → reset the failure counter; backoff
145 // disappears immediately so a recovered replica
146 // doesn't drag a stale skip-counter into the next
147 // failure cycle.
148 if idx < MAX_TRACKED_INDICES {
149 let mut failures = stats.consecutive_failures.lock();
150 if let Some(slot) = failures.get_mut(idx) {
151 *slot = 0;
152 }
153 }
154 continue;
155 }
156 // Unhealthy. Decide whether the backoff lets us retry.
157 let failures_before = if idx < MAX_TRACKED_INDICES {
158 stats
159 .consecutive_failures
160 .lock()
161 .get(idx)
162 .copied()
163 .unwrap_or(0)
164 } else {
165 0
166 };
167 if !should_retry_now(failures_before, current_tick) {
168 stats.backoff_skips.fetch_add(1, Ordering::AcqRel);
169 continue;
170 }
171 // Retry due — attempt the replace, then bump the
172 // counter regardless of replace-error vs replace-ok
173 // (the new daemon's health is judged next tick).
174 let new_daemon = factory(u8::try_from(idx).unwrap_or(u8::MAX));
175 stats.replacements_initiated.fetch_add(1, Ordering::AcqRel);
176 any_work = true;
177 if let Err(e) = group.replace(idx, new_daemon).await {
178 stats.replacements_failed.fetch_add(1, Ordering::AcqRel);
179 tracing::warn!(
180 error = %e,
181 replica_index = idx,
182 "HealthMonitor: replace failed; continuing"
183 );
184 }
185 if idx < MAX_TRACKED_INDICES {
186 let mut failures = stats.consecutive_failures.lock();
187 if let Some(slot) = failures.get_mut(idx) {
188 *slot = slot.saturating_add(1);
189 }
190 }
191 }
192 any_work
193}
194
195/// Should we attempt a replace this tick given `failures`
196/// prior failures + the `current_tick` counter?
197///
198/// - `failures == 0` → first sighting; always retry.
199/// - `failures >= 1` → retry every `2^min(failures, MAX_BACKOFF_SHIFT)`
200/// ticks. Concretely: `current_tick % step == 0` where
201/// `step = 2^min(failures, MAX_BACKOFF_SHIFT)`.
202fn should_retry_now(failures: u32, current_tick: u64) -> bool {
203 if failures == 0 {
204 return true;
205 }
206 let shift = failures.min(MAX_BACKOFF_SHIFT);
207 let step: u64 = 1u64 << shift;
208 current_tick.is_multiple_of(step)
209}
210
211impl<L: LifecycleDaemon> HealthMonitor<L> {
212 /// Spawn a background task that polls `group.health()`
213 /// every `interval` and calls
214 /// `group.replace(index, factory(index))` for each replica
215 /// reporting unhealthy.
216 ///
217 /// The group is an `Arc<AsyncMutex<Option<LifecycleGroup<L>>>>`
218 /// so the registry's `unregister` path can `take` the group
219 /// out without disturbing the monitor — the monitor's poll
220 /// becomes a no-op once the `Option` is `None`. Pure-RAII
221 /// callers wrap their group with `Some(...)` at construction.
222 pub fn spawn<F>(
223 group: Arc<AsyncMutex<Option<LifecycleGroup<L>>>>,
224 mut factory: F,
225 interval: Duration,
226 ) -> Self
227 where
228 F: FnMut(u8) -> Arc<L> + Send + 'static,
229 {
230 let stats = Arc::new(HealthMonitorStats::default());
231 let shutdown = Arc::new(AtomicBool::new(false));
232 let stats_for_task = stats.clone();
233 let shutdown_for_task = shutdown.clone();
234
235 let task = tokio::spawn(async move {
236 let mut ticker = tokio::time::interval(interval);
237 // Skip the immediate first tick so the monitor's
238 // first poll happens after one full interval —
239 // gives daemons a chance to settle in.
240 ticker.tick().await;
241 loop {
242 if shutdown_for_task.load(Ordering::Acquire) {
243 return;
244 }
245 ticker.tick().await;
246 if shutdown_for_task.load(Ordering::Acquire) {
247 return;
248 }
249 // Hold the lock for the entire health-poll +
250 // respawn pass. The `Option` short-circuits if
251 // the group's been taken via unregister.
252 {
253 let mut guard = group.lock().await;
254 if let Some(lg) = guard.as_mut() {
255 let _ = run_poll_pass(lg, &mut factory, &stats_for_task).await;
256 }
257 }
258 stats_for_task.ticks.fetch_add(1, Ordering::AcqRel);
259 *stats_for_task.last_tick_at.lock() = Some(std::time::Instant::now());
260 }
261 });
262
263 Self {
264 stats,
265 shutdown,
266 task: AsyncMutex::new(Some(task)),
267 _marker: std::marker::PhantomData,
268 }
269 }
270
271 /// Borrow the runtime counters for operator tooling.
272 pub fn stats(&self) -> &Arc<HealthMonitorStats> {
273 &self.stats
274 }
275
276 /// Signal the monitor loop to stop and await its teardown.
277 /// Idempotent — calling twice is a no-op the second time.
278 pub async fn stop(&self) {
279 self.shutdown.store(true, Ordering::Release);
280 let task = self.task.lock().await.take();
281 if let Some(t) = task {
282 let _ = t.await;
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::super::daemon::{LifecycleError, ReplicaHealth};
290 use super::*;
291 use async_trait::async_trait;
292 use std::sync::atomic::AtomicBool as StdAtomicBool;
293
294 struct ToggleHealthDaemon {
295 unhealthy: StdAtomicBool,
296 starts: AtomicU64,
297 stops: AtomicU64,
298 }
299
300 impl ToggleHealthDaemon {
301 fn new(start_unhealthy: bool) -> Self {
302 Self {
303 unhealthy: StdAtomicBool::new(start_unhealthy),
304 starts: AtomicU64::new(0),
305 stops: AtomicU64::new(0),
306 }
307 }
308 }
309
310 #[async_trait]
311 impl LifecycleDaemon for ToggleHealthDaemon {
312 fn name(&self) -> &str {
313 "toggle"
314 }
315 async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
316 self.starts.fetch_add(1, Ordering::AcqRel);
317 Ok(())
318 }
319 async fn on_stop(&self) {
320 self.stops.fetch_add(1, Ordering::AcqRel);
321 }
322 async fn health(&self) -> ReplicaHealth {
323 if self.unhealthy.load(Ordering::Acquire) {
324 ReplicaHealth::unhealthy("toggle-set-unhealthy")
325 } else {
326 ReplicaHealth::healthy()
327 }
328 }
329 }
330
331 #[tokio::test]
332 async fn monitor_replaces_an_unhealthy_replica_after_one_poll() {
333 // Build a 2-replica group where replica 1 reports
334 // unhealthy. Spawn a monitor with a factory that
335 // returns fresh healthy daemons. After one poll
336 // interval, replica 1 must be a fresh daemon.
337 let original_replicas: Arc<parking_lot::Mutex<Vec<Arc<ToggleHealthDaemon>>>> =
338 Arc::new(parking_lot::Mutex::new(Vec::new()));
339 let original_clone = original_replicas.clone();
340 let group = LifecycleGroup::<ToggleHealthDaemon>::spawn(2, [0u8; 32], move |idx| {
341 // Replica 1 reports unhealthy from the start.
342 let d = Arc::new(ToggleHealthDaemon::new(idx == 1));
343 original_clone.lock().push(d.clone());
344 d
345 })
346 .await
347 .expect("spawn group");
348 let original_at_1 = original_replicas.lock()[1].clone();
349
350 let group = Arc::new(AsyncMutex::new(Some(group)));
351 let factory_calls: Arc<parking_lot::Mutex<Vec<u8>>> =
352 Arc::new(parking_lot::Mutex::new(Vec::new()));
353 let factory_calls_clone = factory_calls.clone();
354 let monitor = HealthMonitor::spawn(
355 group.clone(),
356 move |idx| {
357 factory_calls_clone.lock().push(idx);
358 // Replacement is healthy — so the next poll
359 // doesn't try to replace it again.
360 Arc::new(ToggleHealthDaemon::new(false))
361 },
362 Duration::from_millis(50),
363 );
364
365 // First tick lands at ~50ms; sleep enough for one or
366 // two ticks but not so many that we'd see a runaway
367 // replace loop.
368 tokio::time::sleep(Duration::from_millis(140)).await;
369
370 // The original unhealthy replica must have been stopped
371 // and replaced. Factory was called at index 1.
372 assert!(
373 !factory_calls.lock().is_empty(),
374 "factory should have been called at least once"
375 );
376 assert!(
377 factory_calls.lock().contains(&1),
378 "factory must have been called for the unhealthy index 1"
379 );
380 assert!(
381 original_at_1.stops.load(Ordering::Acquire) >= 1,
382 "original index-1 daemon must have been stopped during replace"
383 );
384
385 // The replaced daemon at index 1 in the group must be a
386 // different Arc than the original.
387 {
388 let g = group.lock().await;
389 let lg = g.as_ref().expect("group not taken");
390 let now_at_1 = lg.replica(1).expect("replica 1");
391 assert!(
392 !Arc::ptr_eq(&now_at_1, &original_at_1),
393 "replica 1 should be the replacement, not the original"
394 );
395 }
396
397 // Stats are populated.
398 assert!(
399 monitor
400 .stats()
401 .replacements_initiated
402 .load(Ordering::Acquire)
403 >= 1
404 );
405 assert!(monitor.stats().ticks.load(Ordering::Acquire) >= 1);
406
407 monitor.stop().await;
408 // The Mutex still holds a live group; release it cleanly.
409 let g = Arc::try_unwrap(group)
410 .map_err(|_| "still referenced")
411 .expect("only ref")
412 .into_inner();
413 if let Some(lg) = g {
414 lg.stop().await;
415 }
416 }
417
418 #[tokio::test]
419 async fn monitor_skips_replace_when_all_healthy() {
420 // Healthy 2-replica group; monitor should never call
421 // the factory.
422 let group = LifecycleGroup::<ToggleHealthDaemon>::spawn(2, [0u8; 32], |_idx| {
423 Arc::new(ToggleHealthDaemon::new(false))
424 })
425 .await
426 .expect("spawn");
427 let group = Arc::new(AsyncMutex::new(Some(group)));
428 let factory_calls: Arc<parking_lot::Mutex<u32>> = Arc::new(parking_lot::Mutex::new(0));
429 let factory_calls_clone = factory_calls.clone();
430 let monitor = HealthMonitor::spawn(
431 group.clone(),
432 move |_idx| {
433 *factory_calls_clone.lock() += 1;
434 Arc::new(ToggleHealthDaemon::new(false))
435 },
436 Duration::from_millis(30),
437 );
438
439 // Run for several poll intervals.
440 tokio::time::sleep(Duration::from_millis(100)).await;
441 assert_eq!(*factory_calls.lock(), 0, "factory must not be called");
442 assert_eq!(
443 monitor
444 .stats()
445 .replacements_initiated
446 .load(Ordering::Acquire),
447 0
448 );
449 assert!(monitor.stats().ticks.load(Ordering::Acquire) >= 1);
450
451 monitor.stop().await;
452 let g = Arc::try_unwrap(group)
453 .map_err(|_| "still referenced")
454 .expect("only ref")
455 .into_inner();
456 if let Some(lg) = g {
457 lg.stop().await;
458 }
459 }
460
461 #[test]
462 fn should_retry_now_first_failure_always_retries() {
463 // No prior failures → retry every tick.
464 for tick in 0..20u64 {
465 assert!(should_retry_now(0, tick), "tick {tick} with 0 failures");
466 }
467 }
468
469 #[test]
470 fn should_retry_now_backoff_grows_exponentially() {
471 // failures=1 → step=2, retries at every other tick.
472 let retries_with_1: Vec<u64> = (0..16).filter(|t| should_retry_now(1, *t)).collect();
473 assert_eq!(retries_with_1, vec![0, 2, 4, 6, 8, 10, 12, 14]);
474
475 // failures=2 → step=4.
476 let retries_with_2: Vec<u64> = (0..16).filter(|t| should_retry_now(2, *t)).collect();
477 assert_eq!(retries_with_2, vec![0, 4, 8, 12]);
478
479 // failures=3 → step=8.
480 let retries_with_3: Vec<u64> = (0..16).filter(|t| should_retry_now(3, *t)).collect();
481 assert_eq!(retries_with_3, vec![0, 8]);
482 }
483
484 #[test]
485 fn should_retry_now_caps_at_max_backoff_shift() {
486 // Very high failure counts cap at MAX_BACKOFF_SHIFT.
487 // 2^MAX_BACKOFF_SHIFT ticks between retries.
488 let max_step: u64 = 1u64 << MAX_BACKOFF_SHIFT;
489 // 100 failures should NOT increase step beyond max.
490 assert!(should_retry_now(100, max_step));
491 assert!(!should_retry_now(100, max_step + 1));
492 // Even u32::MAX failures cap at the same step.
493 assert!(should_retry_now(u32::MAX, max_step));
494 }
495
496 /// Daemon that's perpetually unhealthy + counts how many
497 /// `on_start` calls it sees. Lets the backoff test prove a
498 /// persistent failure doesn't spawn N replacements at every
499 /// monitor tick.
500 struct PerpetuallyUnhealthyDaemon {
501 starts: AtomicU64,
502 stops: AtomicU64,
503 }
504
505 #[async_trait]
506 impl LifecycleDaemon for PerpetuallyUnhealthyDaemon {
507 fn name(&self) -> &str {
508 "perpetually-unhealthy"
509 }
510 async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
511 self.starts.fetch_add(1, Ordering::AcqRel);
512 Ok(())
513 }
514 async fn on_stop(&self) {
515 self.stops.fetch_add(1, Ordering::AcqRel);
516 }
517 async fn health(&self) -> ReplicaHealth {
518 ReplicaHealth::unhealthy("never-recovers")
519 }
520 }
521
522 #[tokio::test]
523 async fn monitor_backoff_throttles_replaces_after_consecutive_failures() {
524 // A daemon that's always unhealthy gets replaced each
525 // poll. With backoff, the replacement count grows
526 // logarithmically with elapsed time — not linearly with
527 // tick count. After enough consecutive failures the
528 // monitor should be skipping most ticks.
529 let group = LifecycleGroup::<PerpetuallyUnhealthyDaemon>::spawn(1, [0u8; 32], |_idx| {
530 Arc::new(PerpetuallyUnhealthyDaemon {
531 starts: AtomicU64::new(0),
532 stops: AtomicU64::new(0),
533 })
534 })
535 .await
536 .expect("spawn");
537 let group = Arc::new(AsyncMutex::new(Some(group)));
538 let monitor = HealthMonitor::spawn(
539 group.clone(),
540 |_idx| {
541 Arc::new(PerpetuallyUnhealthyDaemon {
542 starts: AtomicU64::new(0),
543 stops: AtomicU64::new(0),
544 })
545 },
546 Duration::from_millis(15),
547 );
548
549 // Run for many monitor intervals — 300 ms / 15 ms = ~20
550 // ticks worth. Without backoff, that'd be ~20 replace
551 // attempts. With backoff (1, 2, 4, 8, 16-tick steps)
552 // we expect ~6 replaces (one per backoff doubling).
553 tokio::time::sleep(Duration::from_millis(300)).await;
554
555 let initiated = monitor
556 .stats()
557 .replacements_initiated
558 .load(Ordering::Acquire);
559 let skips = monitor.stats().backoff_skips.load(Ordering::Acquire);
560 // Backoff should have skipped at least a few ticks by
561 // now. Exact counts are timing-sensitive in CI, so we
562 // assert directional invariants:
563 assert!(
564 initiated <= 12,
565 "without backoff this would be 20+; got {initiated}"
566 );
567 assert!(
568 skips >= 3,
569 "expected backoff to skip at least 3 ticks; got {skips}"
570 );
571
572 monitor.stop().await;
573 let g = Arc::try_unwrap(group)
574 .map_err(|_| "still referenced")
575 .expect("only ref")
576 .into_inner();
577 if let Some(lg) = g {
578 lg.stop().await;
579 }
580 }
581}