Skip to main content

net/adapter/net/behavior/lifecycle/
monitor.rs

1//! [`HealthMonitor`] — background driver that polls a
2//! [`LifecycleGroup`]'s per-replica health and respawns
3//! unhealthy replicas via an operator-supplied factory.
4//!
5//! Direction B / step 4b of
6//! `docs/plans/AGGREGATOR_LIFECYCLE_DEFERRED_2026_05_23.md`.
7//! Built as a sibling to `LifecycleGroup` rather than baked
8//! into the group itself so:
9//!
10//! - Groups that don't want auto-respawn (single-process tests,
11//!   purely-snapshot CLI inspection) pay no overhead.
12//! - The monitor's factory + state lives outside the group, so
13//!   the group's lifetime + the monitor's lifetime are
14//!   independent (the monitor can be stopped while the group
15//!   stays running, or vice versa).
16//!
17//! # Threading
18//!
19//! The monitor takes an `Arc<tokio::sync::Mutex<LifecycleGroup<L>>>`
20//! — async-mutex is required because the monitor's poll +
21//! replace path holds the lock across `.await` points. Operators
22//! who never auto-respawn keep their `LifecycleGroup` un-wrapped;
23//! switching to managed mode is a one-line wrap.
24//!
25//! # What's NOT in this slice
26//!
27//! - **Registry integration.** The `AggregatorRegistry` stores
28//!   replicas + handles separately (not as a `LifecycleGroup`),
29//!   so wiring auto-respawn into registry-managed groups needs
30//!   a small registry refactor — tracked as a follow-up.
31//! - **Backoff on repeated failure.** If a replica keeps going
32//!   unhealthy, the monitor keeps replacing it on every tick.
33//!   Operators can read `HealthMonitorStats::replacements_failed`
34//!   to detect persistent failures and shut the monitor down.
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Duration;
39
40use parking_lot::Mutex as ParkingMutex;
41use tokio::sync::Mutex as AsyncMutex;
42use tokio::task::JoinHandle;
43
44use super::daemon::LifecycleDaemon;
45use super::group::LifecycleGroup;
46
47/// Maximum number of ticks the backoff can skip between
48/// replace attempts. With a 1 s monitor interval that's
49/// ~256 s of cooldown after enough consecutive failures —
50/// long enough that a persistently broken replica stops
51/// churning the registry; short enough that recovery is
52/// observable within minutes of the underlying fix landing.
53const MAX_BACKOFF_SHIFT: u32 = 8;
54
55/// Number of `consecutive_failures` slots beyond which we
56/// stop growing the per-index Vec. Replica indices are u8,
57/// so 256 caps the worst case.
58const MAX_TRACKED_INDICES: usize = 256;
59
60/// Runtime counters surfaced to operator tooling. Atomic
61/// fields so reads from CLI / Deck panels are wait-free.
62#[derive(Debug, Default)]
63pub struct HealthMonitorStats {
64    /// Number of poll ticks the monitor has completed since
65    /// `spawn`. Increments after each pass over the group's
66    /// replicas, regardless of how many were unhealthy.
67    pub ticks: AtomicU64,
68    /// Number of `LifecycleGroup::replace` calls initiated.
69    pub replacements_initiated: AtomicU64,
70    /// Number of replace attempts that failed (factory
71    /// returned a daemon whose `on_start` errored, or the slot
72    /// index went out of bounds mid-respawn). Operators
73    /// detecting persistent failures should consult this.
74    pub replacements_failed: AtomicU64,
75    /// Number of poll ticks where the monitor skipped a known-
76    /// unhealthy replica because exponential backoff hadn't
77    /// elapsed yet. Counts every (tick × skipped-replica)
78    /// combination; operators reading the per-index backoff
79    /// state should consult `consecutive_failures`.
80    pub backoff_skips: AtomicU64,
81    /// Per-replica-index consecutive-failure counter. Bumped
82    /// each tick the replica is still unhealthy after a
83    /// replace attempt; reset to 0 when the replica reports
84    /// healthy. The `2^consecutive_failures` shift drives the
85    /// next-retry-tick computation (capped at
86    /// `MAX_BACKOFF_SHIFT`).
87    pub consecutive_failures: ParkingMutex<Vec<u32>>,
88    /// `Instant` of the most recent poll tick, recorded after
89    /// each pass. `None` until the first tick lands.
90    pub last_tick_at: ParkingMutex<Option<std::time::Instant>>,
91}
92
93/// Background driver for [`LifecycleGroup`] auto-respawn.
94/// Construct via [`HealthMonitor::spawn`]; stop via
95/// [`HealthMonitor::stop`].
96pub struct HealthMonitor<L: LifecycleDaemon> {
97    stats: Arc<HealthMonitorStats>,
98    shutdown: Arc<AtomicBool>,
99    task: AsyncMutex<Option<JoinHandle<()>>>,
100    /// Held for type inference + the `_marker` field that lets
101    /// the monitor outlive the spawning function without
102    /// dangling. The actual group reference is captured by the
103    /// spawned task.
104    _marker: std::marker::PhantomData<L>,
105}
106
107/// One health-poll + respawn pass against a live group.
108///
109/// Honors the per-index exponential backoff stored in
110/// `stats.consecutive_failures`. A replica that's been
111/// unhealthy for N consecutive ticks gets retried at
112/// `2^min(N, MAX_BACKOFF_SHIFT)` tick intervals — so a
113/// persistently broken replica stops churning the registry
114/// + the `LifecycleGroup::replace` lock at every tick.
115///
116/// Backoff state transitions:
117/// - Replica reports healthy → reset to 0.
118/// - Replica reports unhealthy + retry due → attempt replace,
119///   then bump counter (the replacement is judged on the next
120///   tick).
121/// - Replica reports unhealthy + retry not due → record a
122///   skip in `stats.backoff_skips` and continue.
123async fn run_poll_pass<L, F>(
124    group: &mut LifecycleGroup<L>,
125    factory: &mut F,
126    stats: &Arc<HealthMonitorStats>,
127) -> bool
128where
129    L: LifecycleDaemon,
130    F: FnMut(u8) -> Arc<L> + Send + 'static,
131{
132    let snapshot = group.health().await;
133    let mut any_work = false;
134    // Grow the per-index failure Vec to cover this group.
135    {
136        let mut failures = stats.consecutive_failures.lock();
137        if failures.len() < snapshot.len() {
138            failures.resize(snapshot.len().min(MAX_TRACKED_INDICES), 0);
139        }
140    }
141    let current_tick = stats.ticks.load(Ordering::Acquire);
142    for (idx, h) in snapshot.iter().enumerate() {
143        if h.healthy {
144            // Healthy → reset the failure counter; backoff
145            // disappears immediately so a recovered replica
146            // doesn't drag a stale skip-counter into the next
147            // failure cycle.
148            if idx < MAX_TRACKED_INDICES {
149                let mut failures = stats.consecutive_failures.lock();
150                if let Some(slot) = failures.get_mut(idx) {
151                    *slot = 0;
152                }
153            }
154            continue;
155        }
156        // Unhealthy. Decide whether the backoff lets us retry.
157        let failures_before = if idx < MAX_TRACKED_INDICES {
158            stats
159                .consecutive_failures
160                .lock()
161                .get(idx)
162                .copied()
163                .unwrap_or(0)
164        } else {
165            0
166        };
167        if !should_retry_now(failures_before, current_tick) {
168            stats.backoff_skips.fetch_add(1, Ordering::AcqRel);
169            continue;
170        }
171        // Retry due — attempt the replace, then bump the
172        // counter regardless of replace-error vs replace-ok
173        // (the new daemon's health is judged next tick).
174        let new_daemon = factory(u8::try_from(idx).unwrap_or(u8::MAX));
175        stats.replacements_initiated.fetch_add(1, Ordering::AcqRel);
176        any_work = true;
177        if let Err(e) = group.replace(idx, new_daemon).await {
178            stats.replacements_failed.fetch_add(1, Ordering::AcqRel);
179            tracing::warn!(
180                error = %e,
181                replica_index = idx,
182                "HealthMonitor: replace failed; continuing"
183            );
184        }
185        if idx < MAX_TRACKED_INDICES {
186            let mut failures = stats.consecutive_failures.lock();
187            if let Some(slot) = failures.get_mut(idx) {
188                *slot = slot.saturating_add(1);
189            }
190        }
191    }
192    any_work
193}
194
195/// Should we attempt a replace this tick given `failures`
196/// prior failures + the `current_tick` counter?
197///
198/// - `failures == 0` → first sighting; always retry.
199/// - `failures >= 1` → retry every `2^min(failures, MAX_BACKOFF_SHIFT)`
200///   ticks. Concretely: `current_tick % step == 0` where
201///   `step = 2^min(failures, MAX_BACKOFF_SHIFT)`.
202fn should_retry_now(failures: u32, current_tick: u64) -> bool {
203    if failures == 0 {
204        return true;
205    }
206    let shift = failures.min(MAX_BACKOFF_SHIFT);
207    let step: u64 = 1u64 << shift;
208    current_tick.is_multiple_of(step)
209}
210
211impl<L: LifecycleDaemon> HealthMonitor<L> {
212    /// Spawn a background task that polls `group.health()`
213    /// every `interval` and calls
214    /// `group.replace(index, factory(index))` for each replica
215    /// reporting unhealthy.
216    ///
217    /// The group is an `Arc<AsyncMutex<Option<LifecycleGroup<L>>>>`
218    /// so the registry's `unregister` path can `take` the group
219    /// out without disturbing the monitor — the monitor's poll
220    /// becomes a no-op once the `Option` is `None`. Pure-RAII
221    /// callers wrap their group with `Some(...)` at construction.
222    pub fn spawn<F>(
223        group: Arc<AsyncMutex<Option<LifecycleGroup<L>>>>,
224        mut factory: F,
225        interval: Duration,
226    ) -> Self
227    where
228        F: FnMut(u8) -> Arc<L> + Send + 'static,
229    {
230        let stats = Arc::new(HealthMonitorStats::default());
231        let shutdown = Arc::new(AtomicBool::new(false));
232        let stats_for_task = stats.clone();
233        let shutdown_for_task = shutdown.clone();
234
235        let task = tokio::spawn(async move {
236            let mut ticker = tokio::time::interval(interval);
237            // Skip the immediate first tick so the monitor's
238            // first poll happens after one full interval —
239            // gives daemons a chance to settle in.
240            ticker.tick().await;
241            loop {
242                if shutdown_for_task.load(Ordering::Acquire) {
243                    return;
244                }
245                ticker.tick().await;
246                if shutdown_for_task.load(Ordering::Acquire) {
247                    return;
248                }
249                // Hold the lock for the entire health-poll +
250                // respawn pass. The `Option` short-circuits if
251                // the group's been taken via unregister.
252                {
253                    let mut guard = group.lock().await;
254                    if let Some(lg) = guard.as_mut() {
255                        let _ = run_poll_pass(lg, &mut factory, &stats_for_task).await;
256                    }
257                }
258                stats_for_task.ticks.fetch_add(1, Ordering::AcqRel);
259                *stats_for_task.last_tick_at.lock() = Some(std::time::Instant::now());
260            }
261        });
262
263        Self {
264            stats,
265            shutdown,
266            task: AsyncMutex::new(Some(task)),
267            _marker: std::marker::PhantomData,
268        }
269    }
270
271    /// Borrow the runtime counters for operator tooling.
272    pub fn stats(&self) -> &Arc<HealthMonitorStats> {
273        &self.stats
274    }
275
276    /// Signal the monitor loop to stop and await its teardown.
277    /// Idempotent — calling twice is a no-op the second time.
278    pub async fn stop(&self) {
279        self.shutdown.store(true, Ordering::Release);
280        let task = self.task.lock().await.take();
281        if let Some(t) = task {
282            let _ = t.await;
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::super::daemon::{LifecycleError, ReplicaHealth};
290    use super::*;
291    use async_trait::async_trait;
292    use std::sync::atomic::AtomicBool as StdAtomicBool;
293
294    struct ToggleHealthDaemon {
295        unhealthy: StdAtomicBool,
296        starts: AtomicU64,
297        stops: AtomicU64,
298    }
299
300    impl ToggleHealthDaemon {
301        fn new(start_unhealthy: bool) -> Self {
302            Self {
303                unhealthy: StdAtomicBool::new(start_unhealthy),
304                starts: AtomicU64::new(0),
305                stops: AtomicU64::new(0),
306            }
307        }
308    }
309
310    #[async_trait]
311    impl LifecycleDaemon for ToggleHealthDaemon {
312        fn name(&self) -> &str {
313            "toggle"
314        }
315        async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
316            self.starts.fetch_add(1, Ordering::AcqRel);
317            Ok(())
318        }
319        async fn on_stop(&self) {
320            self.stops.fetch_add(1, Ordering::AcqRel);
321        }
322        async fn health(&self) -> ReplicaHealth {
323            if self.unhealthy.load(Ordering::Acquire) {
324                ReplicaHealth::unhealthy("toggle-set-unhealthy")
325            } else {
326                ReplicaHealth::healthy()
327            }
328        }
329    }
330
331    #[tokio::test]
332    async fn monitor_replaces_an_unhealthy_replica_after_one_poll() {
333        // Build a 2-replica group where replica 1 reports
334        // unhealthy. Spawn a monitor with a factory that
335        // returns fresh healthy daemons. After one poll
336        // interval, replica 1 must be a fresh daemon.
337        let original_replicas: Arc<parking_lot::Mutex<Vec<Arc<ToggleHealthDaemon>>>> =
338            Arc::new(parking_lot::Mutex::new(Vec::new()));
339        let original_clone = original_replicas.clone();
340        let group = LifecycleGroup::<ToggleHealthDaemon>::spawn(2, [0u8; 32], move |idx| {
341            // Replica 1 reports unhealthy from the start.
342            let d = Arc::new(ToggleHealthDaemon::new(idx == 1));
343            original_clone.lock().push(d.clone());
344            d
345        })
346        .await
347        .expect("spawn group");
348        let original_at_1 = original_replicas.lock()[1].clone();
349
350        let group = Arc::new(AsyncMutex::new(Some(group)));
351        let factory_calls: Arc<parking_lot::Mutex<Vec<u8>>> =
352            Arc::new(parking_lot::Mutex::new(Vec::new()));
353        let factory_calls_clone = factory_calls.clone();
354        let monitor = HealthMonitor::spawn(
355            group.clone(),
356            move |idx| {
357                factory_calls_clone.lock().push(idx);
358                // Replacement is healthy — so the next poll
359                // doesn't try to replace it again.
360                Arc::new(ToggleHealthDaemon::new(false))
361            },
362            Duration::from_millis(50),
363        );
364
365        // First tick lands at ~50ms; sleep enough for one or
366        // two ticks but not so many that we'd see a runaway
367        // replace loop.
368        tokio::time::sleep(Duration::from_millis(140)).await;
369
370        // The original unhealthy replica must have been stopped
371        // and replaced. Factory was called at index 1.
372        assert!(
373            !factory_calls.lock().is_empty(),
374            "factory should have been called at least once"
375        );
376        assert!(
377            factory_calls.lock().contains(&1),
378            "factory must have been called for the unhealthy index 1"
379        );
380        assert!(
381            original_at_1.stops.load(Ordering::Acquire) >= 1,
382            "original index-1 daemon must have been stopped during replace"
383        );
384
385        // The replaced daemon at index 1 in the group must be a
386        // different Arc than the original.
387        {
388            let g = group.lock().await;
389            let lg = g.as_ref().expect("group not taken");
390            let now_at_1 = lg.replica(1).expect("replica 1");
391            assert!(
392                !Arc::ptr_eq(&now_at_1, &original_at_1),
393                "replica 1 should be the replacement, not the original"
394            );
395        }
396
397        // Stats are populated.
398        assert!(
399            monitor
400                .stats()
401                .replacements_initiated
402                .load(Ordering::Acquire)
403                >= 1
404        );
405        assert!(monitor.stats().ticks.load(Ordering::Acquire) >= 1);
406
407        monitor.stop().await;
408        // The Mutex still holds a live group; release it cleanly.
409        let g = Arc::try_unwrap(group)
410            .map_err(|_| "still referenced")
411            .expect("only ref")
412            .into_inner();
413        if let Some(lg) = g {
414            lg.stop().await;
415        }
416    }
417
418    #[tokio::test]
419    async fn monitor_skips_replace_when_all_healthy() {
420        // Healthy 2-replica group; monitor should never call
421        // the factory.
422        let group = LifecycleGroup::<ToggleHealthDaemon>::spawn(2, [0u8; 32], |_idx| {
423            Arc::new(ToggleHealthDaemon::new(false))
424        })
425        .await
426        .expect("spawn");
427        let group = Arc::new(AsyncMutex::new(Some(group)));
428        let factory_calls: Arc<parking_lot::Mutex<u32>> = Arc::new(parking_lot::Mutex::new(0));
429        let factory_calls_clone = factory_calls.clone();
430        let monitor = HealthMonitor::spawn(
431            group.clone(),
432            move |_idx| {
433                *factory_calls_clone.lock() += 1;
434                Arc::new(ToggleHealthDaemon::new(false))
435            },
436            Duration::from_millis(30),
437        );
438
439        // Run for several poll intervals.
440        tokio::time::sleep(Duration::from_millis(100)).await;
441        assert_eq!(*factory_calls.lock(), 0, "factory must not be called");
442        assert_eq!(
443            monitor
444                .stats()
445                .replacements_initiated
446                .load(Ordering::Acquire),
447            0
448        );
449        assert!(monitor.stats().ticks.load(Ordering::Acquire) >= 1);
450
451        monitor.stop().await;
452        let g = Arc::try_unwrap(group)
453            .map_err(|_| "still referenced")
454            .expect("only ref")
455            .into_inner();
456        if let Some(lg) = g {
457            lg.stop().await;
458        }
459    }
460
461    #[test]
462    fn should_retry_now_first_failure_always_retries() {
463        // No prior failures → retry every tick.
464        for tick in 0..20u64 {
465            assert!(should_retry_now(0, tick), "tick {tick} with 0 failures");
466        }
467    }
468
469    #[test]
470    fn should_retry_now_backoff_grows_exponentially() {
471        // failures=1 → step=2, retries at every other tick.
472        let retries_with_1: Vec<u64> = (0..16).filter(|t| should_retry_now(1, *t)).collect();
473        assert_eq!(retries_with_1, vec![0, 2, 4, 6, 8, 10, 12, 14]);
474
475        // failures=2 → step=4.
476        let retries_with_2: Vec<u64> = (0..16).filter(|t| should_retry_now(2, *t)).collect();
477        assert_eq!(retries_with_2, vec![0, 4, 8, 12]);
478
479        // failures=3 → step=8.
480        let retries_with_3: Vec<u64> = (0..16).filter(|t| should_retry_now(3, *t)).collect();
481        assert_eq!(retries_with_3, vec![0, 8]);
482    }
483
484    #[test]
485    fn should_retry_now_caps_at_max_backoff_shift() {
486        // Very high failure counts cap at MAX_BACKOFF_SHIFT.
487        // 2^MAX_BACKOFF_SHIFT ticks between retries.
488        let max_step: u64 = 1u64 << MAX_BACKOFF_SHIFT;
489        // 100 failures should NOT increase step beyond max.
490        assert!(should_retry_now(100, max_step));
491        assert!(!should_retry_now(100, max_step + 1));
492        // Even u32::MAX failures cap at the same step.
493        assert!(should_retry_now(u32::MAX, max_step));
494    }
495
496    /// Daemon that's perpetually unhealthy + counts how many
497    /// `on_start` calls it sees. Lets the backoff test prove a
498    /// persistent failure doesn't spawn N replacements at every
499    /// monitor tick.
500    struct PerpetuallyUnhealthyDaemon {
501        starts: AtomicU64,
502        stops: AtomicU64,
503    }
504
505    #[async_trait]
506    impl LifecycleDaemon for PerpetuallyUnhealthyDaemon {
507        fn name(&self) -> &str {
508            "perpetually-unhealthy"
509        }
510        async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
511            self.starts.fetch_add(1, Ordering::AcqRel);
512            Ok(())
513        }
514        async fn on_stop(&self) {
515            self.stops.fetch_add(1, Ordering::AcqRel);
516        }
517        async fn health(&self) -> ReplicaHealth {
518            ReplicaHealth::unhealthy("never-recovers")
519        }
520    }
521
522    #[tokio::test]
523    async fn monitor_backoff_throttles_replaces_after_consecutive_failures() {
524        // A daemon that's always unhealthy gets replaced each
525        // poll. With backoff, the replacement count grows
526        // logarithmically with elapsed time — not linearly with
527        // tick count. After enough consecutive failures the
528        // monitor should be skipping most ticks.
529        let group = LifecycleGroup::<PerpetuallyUnhealthyDaemon>::spawn(1, [0u8; 32], |_idx| {
530            Arc::new(PerpetuallyUnhealthyDaemon {
531                starts: AtomicU64::new(0),
532                stops: AtomicU64::new(0),
533            })
534        })
535        .await
536        .expect("spawn");
537        let group = Arc::new(AsyncMutex::new(Some(group)));
538        let monitor = HealthMonitor::spawn(
539            group.clone(),
540            |_idx| {
541                Arc::new(PerpetuallyUnhealthyDaemon {
542                    starts: AtomicU64::new(0),
543                    stops: AtomicU64::new(0),
544                })
545            },
546            Duration::from_millis(15),
547        );
548
549        // Run for many monitor intervals — 300 ms / 15 ms = ~20
550        // ticks worth. Without backoff, that'd be ~20 replace
551        // attempts. With backoff (1, 2, 4, 8, 16-tick steps)
552        // we expect ~6 replaces (one per backoff doubling).
553        tokio::time::sleep(Duration::from_millis(300)).await;
554
555        let initiated = monitor
556            .stats()
557            .replacements_initiated
558            .load(Ordering::Acquire);
559        let skips = monitor.stats().backoff_skips.load(Ordering::Acquire);
560        // Backoff should have skipped at least a few ticks by
561        // now. Exact counts are timing-sensitive in CI, so we
562        // assert directional invariants:
563        assert!(
564            initiated <= 12,
565            "without backoff this would be 20+; got {initiated}"
566        );
567        assert!(
568            skips >= 3,
569            "expected backoff to skip at least 3 ticks; got {skips}"
570        );
571
572        monitor.stop().await;
573        let g = Arc::try_unwrap(group)
574            .map_err(|_| "still referenced")
575            .expect("only ref")
576            .into_inner();
577        if let Some(lg) = g {
578            lg.stop().await;
579        }
580    }
581}