Skip to main content

wire_framework/health_check/
mod.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    sync::{Arc, Mutex},
5    thread,
6    time::Duration,
7};
8
9// Public re-export for other crates to be able to implement the interface.
10pub use async_trait::async_trait;
11use futures::future;
12use serde::Serialize;
13use tokio::sync::watch;
14
15mod metrics;
16use metrics::{AppHealthCheckConfig, CheckResult, METRICS};
17
18pub mod wire;
19
20#[cfg(test)]
21mod tests;
22
23/// Health status returned as a part of `Health`.
24#[derive(Debug, Clone, Copy, PartialEq, Serialize)]
25#[serde(rename_all = "snake_case")]
26#[non_exhaustive]
27pub enum HealthStatus {
28    /// Component is initializing and is not ready yet.
29    NotReady,
30    /// Component is ready for operations.
31    Ready,
32    /// Component is affected by some non-fatal issue. The component is still considered healthy.
33    Affected,
34    /// Component has received a termination request and is in the process of shutting down.
35    /// Components that shut down instantly may skip this status and proceed directly to [`Self::ShutDown`].
36    ShuttingDown,
37    /// Component is shut down.
38    ShutDown,
39    /// Component has been abnormally interrupted by a panic.
40    Panicked,
41}
42
43impl HealthStatus {
44    /// Checks whether a component is healthy according to this status.
45    pub fn is_healthy(self) -> bool {
46        matches!(self, Self::Ready | Self::Affected)
47    }
48
49    fn priority_for_aggregation(self) -> usize {
50        match self {
51            Self::Ready => 0,
52            Self::Affected => 1,
53            Self::ShuttingDown => 2,
54            Self::ShutDown => 3,
55            Self::NotReady => 4,
56            Self::Panicked => 5,
57        }
58    }
59}
60
61/// Health of a single component.
62#[derive(Debug, Clone, Serialize, PartialEq)]
63pub struct Health {
64    status: HealthStatus,
65    /// Component-specific details allowing to assess whether the component is healthy or not.
66    #[serde(skip_serializing_if = "Option::is_none")]
67    details: Option<serde_json::Value>,
68}
69
70impl Health {
71    /// Sets health details.
72    #[must_use]
73    pub fn with_details<T: Serialize>(mut self, details: T) -> Self {
74        let details = serde_json::to_value(details).expect("Failed serializing `Health` details");
75        self.details = Some(details);
76        self
77    }
78
79    /// Returns the overall health status.
80    pub fn status(&self) -> HealthStatus {
81        self.status
82    }
83
84    /// Returns health details. Mostly useful for testing.
85    pub fn details(&self) -> Option<&serde_json::Value> {
86        self.details.as_ref()
87    }
88}
89
90impl From<HealthStatus> for Health {
91    fn from(status: HealthStatus) -> Self {
92        Self {
93            status,
94            details: None,
95        }
96    }
97}
98
99#[derive(Debug, thiserror::Error)]
100#[non_exhaustive]
101pub enum AppHealthCheckError {
102    /// Component is redefined.
103    #[error("cannot insert health check for component `{0}`: it is redefined")]
104    RedefinedComponent(&'static str),
105}
106
107/// Application health check aggregating health from multiple components.
108#[derive(Debug)]
109pub struct AppHealthCheck {
110    inner: Mutex<AppHealthCheckInner>,
111}
112
113#[derive(Debug, Clone)]
114struct AppHealthCheckInner {
115    /// Application-level health details.
116    app_details: Option<serde_json::Value>,
117    components: Vec<Arc<dyn CheckHealth>>,
118    slow_time_limit: Duration,
119    hard_time_limit: Duration,
120}
121
122impl Default for AppHealthCheck {
123    fn default() -> Self {
124        Self::new(None, None)
125    }
126}
127
128impl AppHealthCheck {
129    const DEFAULT_SLOW_TIME_LIMIT: Duration = Duration::from_millis(500);
130    const DEFAULT_HARD_TIME_LIMIT: Duration = Duration::from_secs(3);
131
132    pub fn new(slow_time_limit: Option<Duration>, hard_time_limit: Option<Duration>) -> Self {
133        let slow_time_limit = slow_time_limit.unwrap_or(Self::DEFAULT_SLOW_TIME_LIMIT);
134        let hard_time_limit = hard_time_limit.unwrap_or(Self::DEFAULT_HARD_TIME_LIMIT);
135        tracing::debug!(
136            "Created app health with time limits: slow={slow_time_limit:?}, hard={hard_time_limit:?}"
137        );
138
139        let inner = AppHealthCheckInner {
140            components: Vec::default(),
141            app_details: None,
142            slow_time_limit,
143            hard_time_limit,
144        };
145        Self {
146            inner: Mutex::new(inner),
147        }
148    }
149
150    pub fn override_limits(
151        &self,
152        slow_time_limit: Option<Duration>,
153        hard_time_limit: Option<Duration>,
154    ) {
155        let mut guard = self.inner.lock().expect("`AppHealthCheck` is poisoned");
156        if let Some(slow_time_limit) = slow_time_limit {
157            guard.slow_time_limit = slow_time_limit;
158        }
159        if let Some(hard_time_limit) = hard_time_limit {
160            guard.hard_time_limit = hard_time_limit;
161        }
162        tracing::debug!(
163            "Overridden app health time limits: slow={:?}, hard={:?}",
164            guard.slow_time_limit,
165            guard.hard_time_limit
166        );
167    }
168
169    /// Sets the info metrics for the metrics time limits.
170    /// This method should be called at most once when all the health checks are collected.
171    pub fn expose_metrics(&self) {
172        let config = {
173            let inner = self.inner.lock().expect("`AppHealthCheck` is poisoned");
174            AppHealthCheckConfig {
175                slow_time_limit: inner.slow_time_limit.into(),
176                hard_time_limit: inner.hard_time_limit.into(),
177            }
178        };
179        if METRICS.info.set(config).is_err() {
180            tracing::warn!(
181                "App health redefined; previous config: {:?}",
182                METRICS.info.get()
183            );
184        }
185    }
186
187    /// Sets app-level health details. They can include build info etc.
188    pub fn set_details(&self, details: impl Serialize) {
189        let details = serde_json::to_value(details).expect("failed serializing app details");
190        let mut inner = self.inner.lock().expect("`AppHealthCheck` is poisoned");
191        inner.app_details = Some(details);
192    }
193
194    /// Inserts health check for a component.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if the component with the same name is already defined.
199    pub fn insert_component(
200        &self,
201        health_check: ReactiveHealthCheck,
202    ) -> Result<(), AppHealthCheckError> {
203        self.insert_custom_component(Arc::new(health_check))
204    }
205
206    /// Inserts a custom health check for a component.
207    ///
208    /// # Errors
209    ///
210    /// Returns an error if the component with the same name is already defined.
211    pub fn insert_custom_component(
212        &self,
213        health_check: Arc<dyn CheckHealth>,
214    ) -> Result<(), AppHealthCheckError> {
215        let health_check_name = health_check.name();
216        let mut guard = self.inner.lock().expect("`AppHealthCheck` is poisoned");
217        if guard
218            .components
219            .iter()
220            .any(|check| check.name() == health_check_name)
221        {
222            return Err(AppHealthCheckError::RedefinedComponent(health_check_name));
223        }
224        guard.components.push(health_check);
225        Ok(())
226    }
227
228    /// Checks the overall application health. This will query all component checks concurrently.
229    pub async fn check_health(&self) -> AppHealth {
230        // Clone `inner` so that we don't hold a lock for them across a wait point.
231        let AppHealthCheckInner {
232            components,
233            app_details,
234            slow_time_limit,
235            hard_time_limit,
236        } = self
237            .inner
238            .lock()
239            .expect("`AppHealthCheck` is poisoned")
240            .clone();
241
242        let check_futures = components.iter().map(|check| {
243            Self::check_health_with_time_limit(check.as_ref(), slow_time_limit, hard_time_limit)
244        });
245        let components: HashMap<_, _> = future::join_all(check_futures).await.into_iter().collect();
246
247        let aggregated_status = components
248            .values()
249            .map(|health| health.status)
250            .max_by_key(|status| status.priority_for_aggregation())
251            .unwrap_or(HealthStatus::Ready);
252        let mut inner = Health::from(aggregated_status);
253        inner.details = app_details.clone();
254
255        let health = AppHealth { inner, components };
256        if !health.inner.status.is_healthy() {
257            // Only log non-ready application health so that logs are not spammed without a reason.
258            tracing::debug!("Aggregated application health: {health:?}");
259        }
260        health
261    }
262
263    async fn check_health_with_time_limit(
264        check: &dyn CheckHealth,
265        slow_time_limit: Duration,
266        hard_time_limit: Duration,
267    ) -> (&'static str, Health) {
268        struct DropGuard {
269            check_name: &'static str,
270            started_at: tokio::time::Instant,
271            hard_time_limit: Duration,
272            is_armed: bool,
273        }
274
275        impl Drop for DropGuard {
276            fn drop(&mut self) {
277                if !self.is_armed {
278                    return;
279                }
280
281                let elapsed = self.started_at.elapsed();
282                let &mut Self {
283                    check_name,
284                    hard_time_limit,
285                    ..
286                } = self;
287                tracing::warn!(
288                    "Health check `{check_name}` was dropped before completion after {elapsed:?}; \
289                     check the configured check timeout ({hard_time_limit:?}) and health check logic"
290                );
291                METRICS.observe_abnormal_check(check_name, CheckResult::Dropped, elapsed);
292            }
293        }
294
295        let check_name = check.name();
296        let started_at = tokio::time::Instant::now();
297        let mut drop_guard = DropGuard {
298            check_name,
299            started_at,
300            hard_time_limit,
301            is_armed: true,
302        };
303        let timeout_at = started_at + hard_time_limit;
304
305        let result = tokio::time::timeout_at(timeout_at, check.check_health()).await;
306        drop_guard.is_armed = false;
307        let elapsed = started_at.elapsed();
308        match result {
309            Ok(output) => {
310                if elapsed > slow_time_limit {
311                    tracing::info!(
312                        "Health check `{check_name}` took >{slow_time_limit:?} to complete: {elapsed:?}"
313                    );
314                    METRICS.observe_abnormal_check(check_name, CheckResult::Slow, elapsed);
315                }
316                (check_name, output)
317            }
318            Err(_) => {
319                tracing::warn!(
320                    "Health check `{check_name}` timed out, taking >{hard_time_limit:?} to complete; marking as not ready"
321                );
322                METRICS.observe_abnormal_check(check_name, CheckResult::TimedOut, elapsed);
323                (check_name, HealthStatus::NotReady.into())
324            }
325        }
326    }
327}
328
329/// Health information for an application consisting of multiple components.
330#[derive(Debug, Serialize)]
331pub struct AppHealth {
332    #[serde(flatten)]
333    inner: Health,
334    components: HashMap<&'static str, Health>,
335}
336
337impl AppHealth {
338    pub fn is_healthy(&self) -> bool {
339        self.inner.status.is_healthy()
340    }
341
342    /// Returns a reference to the overall health of the application.
343    pub fn inner(&self) -> &Health {
344        &self.inner
345    }
346
347    /// Returns a reference to the component information.
348    pub fn components(&self) -> &HashMap<&'static str, Health> {
349        &self.components
350    }
351}
352
353/// Interface to be used for health checks.
354#[async_trait]
355pub trait CheckHealth: Send + Sync + 'static {
356    /// Unique name of the component.
357    fn name(&self) -> &'static str;
358    /// Checks health of the component.
359    async fn check_health(&self) -> Health;
360}
361
362impl fmt::Debug for dyn CheckHealth {
363    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
364        formatter
365            .debug_struct("CheckHealth")
366            .field("name", &self.name())
367            .finish()
368    }
369}
370
371#[async_trait]
372impl<T: CheckHealth + ?Sized> CheckHealth for Arc<T> {
373    fn name(&self) -> &'static str {
374        (**self).name()
375    }
376
377    async fn check_health(&self) -> Health {
378        (**self).check_health().await
379    }
380}
381
382/// Basic implementation of [`CheckHealth`] trait that can be updated using a matching [`HealthUpdater`].
383#[derive(Debug, Clone)]
384pub struct ReactiveHealthCheck {
385    name: &'static str,
386    health_receiver: watch::Receiver<Health>,
387}
388
389impl ReactiveHealthCheck {
390    /// Creates a health check together with an updater that can be used to update it.
391    /// The check will return [`HealthStatus::NotReady`] initially.
392    pub fn new(name: &'static str) -> (Self, HealthUpdater) {
393        let (health_sender, health_receiver) = watch::channel(HealthStatus::NotReady.into());
394        let this = Self {
395            name,
396            health_receiver,
397        };
398        let updater = HealthUpdater {
399            name,
400            should_track_drop: true,
401            health_sender,
402        };
403        (this, updater)
404    }
405
406    /// Waits until the specified `condition` is true for the tracked [`Health`], and returns health.
407    /// Mostly useful for testing.
408    ///
409    /// If the health updater associated with this check is dropped, this method can wait indefinitely.
410    pub async fn wait_for(&mut self, condition: impl FnMut(&Health) -> bool) -> Health {
411        match self.health_receiver.wait_for(condition).await {
412            Ok(health) => health.clone(),
413            Err(_) => future::pending().await,
414        }
415    }
416}
417
418#[async_trait]
419impl CheckHealth for ReactiveHealthCheck {
420    fn name(&self) -> &'static str {
421        self.name
422    }
423
424    async fn check_health(&self) -> Health {
425        self.health_receiver.borrow().clone()
426    }
427}
428
429/// Updater for [`ReactiveHealthCheck`]. Can be created using [`ReactiveHealthCheck::new()`].
430///
431/// On drop, will automatically update status to [`HealthStatus::ShutDown`], or to [`HealthStatus::Panicked`]
432/// if the dropping thread is panicking, unless the drop is performed using [`Self::freeze()`].
433#[derive(Debug)]
434pub struct HealthUpdater {
435    name: &'static str,
436    should_track_drop: bool,
437    health_sender: watch::Sender<Health>,
438}
439
440impl HealthUpdater {
441    /// Updates the health check information, returning if a change occurred from previous state.
442    /// Note, description change on Health is counted as a change, even if status is the same.
443    /// I.e., `Health { Ready, None }` to `Health { Ready, Some(_) }` is considered a change.
444    pub fn update(&self, health: Health) -> bool {
445        let old_health = self.health_sender.send_replace(health.clone());
446        if old_health != health {
447            tracing::debug!(
448                "Changed health of `{}` from {} to {}",
449                self.name,
450                serde_json::to_string(&old_health).unwrap_or_else(|_| format!("{old_health:?}")),
451                serde_json::to_string(&health).unwrap_or_else(|_| format!("{health:?}"))
452            );
453            return true;
454        }
455        false
456    }
457
458    /// Closes this updater so that the corresponding health check can no longer be updated, not even if the updater is dropped.
459    pub fn freeze(mut self) {
460        self.should_track_drop = false;
461    }
462
463    /// Creates a [`ReactiveHealthCheck`] attached to this updater. This allows not retaining the initial health check
464    /// returned by [`ReactiveHealthCheck::new()`].
465    pub fn subscribe(&self) -> ReactiveHealthCheck {
466        ReactiveHealthCheck {
467            name: self.name,
468            health_receiver: self.health_sender.subscribe(),
469        }
470    }
471}
472
473impl Drop for HealthUpdater {
474    fn drop(&mut self) {
475        if !self.should_track_drop {
476            return;
477        }
478
479        let terminal_health = if thread::panicking() {
480            HealthStatus::Panicked
481        } else {
482            HealthStatus::ShutDown
483        };
484        self.update(terminal_health.into());
485    }
486}