drogue_bazaar/app/health/run/
mod.rs

1#[cfg(feature = "actix")]
2mod actix;
3
4#[cfg(feature = "actix")]
5pub use actix::HealthServer;
6
7use crate::health::{HealthCheckError, HealthChecked};
8use futures_util::stream::StreamExt;
9use serde::Deserialize;
10use serde_json::{json, Value};
11use std::future::Future;
12use std::sync::Arc;
13use tokio::runtime::Handle;
14use tokio::sync::RwLock;
15use tracing::instrument;
16
17#[derive(Clone, Debug, Deserialize)]
18pub struct HealthServerConfig {
19    #[serde(default)]
20    pub enabled: bool,
21    #[serde(default = "defaults::bind_addr")]
22    pub bind_addr: String,
23    #[serde(default = "defaults::workers")]
24    pub workers: usize,
25}
26
27mod defaults {
28    #[inline]
29    pub fn bind_addr() -> String {
30        "[::1]:9090".into()
31    }
32
33    #[inline]
34    pub fn workers() -> usize {
35        1
36    }
37}
38
39impl Default for HealthServerConfig {
40    fn default() -> Self {
41        Self {
42            enabled: false,
43            bind_addr: defaults::bind_addr(),
44            workers: defaults::workers(),
45        }
46    }
47}
48
49/// Internal handling of health checking.
50#[derive(Clone, Default)]
51pub struct HealthChecker {
52    checks: Arc<RwLock<Vec<Box<dyn HealthChecked>>>>,
53}
54
55impl HealthChecker {
56    #[instrument(level = "trace", skip(self), ret)]
57    pub async fn is_ready(&self) -> Vec<Result<(), HealthCheckError>> {
58        futures_util::stream::iter(self.checks.read().await.iter())
59            .then(|check| check.is_ready())
60            .collect()
61            .await
62    }
63
64    #[instrument(level = "trace", skip(self), ret)]
65    pub async fn is_alive(&self) -> Vec<Result<(), HealthCheckError>> {
66        futures_util::stream::iter(self.checks.read().await.iter())
67            .then(|check| check.is_alive())
68            .collect()
69            .await
70    }
71
72    pub fn push<C>(&self, check: C)
73    where
74        C: Into<Box<dyn HealthChecked + 'static>>,
75    {
76        let check = check.into();
77        let checks = self.checks.clone();
78        Handle::current().spawn(async move {
79            checks.write().await.push(check);
80        });
81    }
82}
83
84impl<C> Extend<C> for HealthChecker
85where
86    C: HealthChecked + 'static,
87{
88    fn extend<T: IntoIterator<Item = C>>(&mut self, iter: T) {
89        // collect first, so that we can send/spawn it
90        let iter: Vec<_> = iter
91            .into_iter()
92            .map(|c| Box::new(c) as Box<dyn HealthChecked>)
93            .collect();
94        let checks = self.checks.clone();
95        Handle::current().spawn(async move {
96            checks.write().await.extend(iter);
97        });
98    }
99}
100
101impl Extend<Box<dyn HealthChecked>> for HealthChecker {
102    fn extend<T: IntoIterator<Item = Box<dyn HealthChecked + 'static>>>(&mut self, iter: T) {
103        // collect first, so that we can send/spawn it
104        let iter: Vec<_> = iter.into_iter().collect();
105        let checks = self.checks.clone();
106        Handle::current().spawn(async move {
107            checks.write().await.extend(iter);
108        });
109    }
110}
111
112#[allow(unused)]
113async fn run_checks<F, Fut>(checker: Arc<HealthChecker>, f: F) -> (http::StatusCode, Value)
114where
115    F: FnOnce(Arc<HealthChecker>) -> Fut,
116    Fut: Future<Output = Vec<Result<(), HealthCheckError>>>,
117{
118    let result: Result<Vec<()>, _> = f(checker).await.into_iter().collect();
119
120    match result {
121        Ok(_) => (http::StatusCode::OK, json!({ "success": true})),
122        Err(_) => (
123            http::StatusCode::SERVICE_UNAVAILABLE,
124            json!({"success": false}),
125        ),
126    }
127}