drogue_bazaar/app/health/run/
mod.rs1#[cfg(feature = "actix")]
2mod actix;
3
4#[cfg(feature = "actix")]
5pub use actix::HealthServer;
6
7use crate::health::{HealthCheckError, HealthChecked};
8use futures_util::stream::StreamExt;
9use serde::Deserialize;
10use serde_json::{json, Value};
11use std::future::Future;
12use std::sync::Arc;
13use tokio::runtime::Handle;
14use tokio::sync::RwLock;
15use tracing::instrument;
16
17#[derive(Clone, Debug, Deserialize)]
18pub struct HealthServerConfig {
19 #[serde(default)]
20 pub enabled: bool,
21 #[serde(default = "defaults::bind_addr")]
22 pub bind_addr: String,
23 #[serde(default = "defaults::workers")]
24 pub workers: usize,
25}
26
27mod defaults {
28 #[inline]
29 pub fn bind_addr() -> String {
30 "[::1]:9090".into()
31 }
32
33 #[inline]
34 pub fn workers() -> usize {
35 1
36 }
37}
38
39impl Default for HealthServerConfig {
40 fn default() -> Self {
41 Self {
42 enabled: false,
43 bind_addr: defaults::bind_addr(),
44 workers: defaults::workers(),
45 }
46 }
47}
48
49#[derive(Clone, Default)]
51pub struct HealthChecker {
52 checks: Arc<RwLock<Vec<Box<dyn HealthChecked>>>>,
53}
54
55impl HealthChecker {
56 #[instrument(level = "trace", skip(self), ret)]
57 pub async fn is_ready(&self) -> Vec<Result<(), HealthCheckError>> {
58 futures_util::stream::iter(self.checks.read().await.iter())
59 .then(|check| check.is_ready())
60 .collect()
61 .await
62 }
63
64 #[instrument(level = "trace", skip(self), ret)]
65 pub async fn is_alive(&self) -> Vec<Result<(), HealthCheckError>> {
66 futures_util::stream::iter(self.checks.read().await.iter())
67 .then(|check| check.is_alive())
68 .collect()
69 .await
70 }
71
72 pub fn push<C>(&self, check: C)
73 where
74 C: Into<Box<dyn HealthChecked + 'static>>,
75 {
76 let check = check.into();
77 let checks = self.checks.clone();
78 Handle::current().spawn(async move {
79 checks.write().await.push(check);
80 });
81 }
82}
83
84impl<C> Extend<C> for HealthChecker
85where
86 C: HealthChecked + 'static,
87{
88 fn extend<T: IntoIterator<Item = C>>(&mut self, iter: T) {
89 let iter: Vec<_> = iter
91 .into_iter()
92 .map(|c| Box::new(c) as Box<dyn HealthChecked>)
93 .collect();
94 let checks = self.checks.clone();
95 Handle::current().spawn(async move {
96 checks.write().await.extend(iter);
97 });
98 }
99}
100
101impl Extend<Box<dyn HealthChecked>> for HealthChecker {
102 fn extend<T: IntoIterator<Item = Box<dyn HealthChecked + 'static>>>(&mut self, iter: T) {
103 let iter: Vec<_> = iter.into_iter().collect();
105 let checks = self.checks.clone();
106 Handle::current().spawn(async move {
107 checks.write().await.extend(iter);
108 });
109 }
110}
111
112#[allow(unused)]
113async fn run_checks<F, Fut>(checker: Arc<HealthChecker>, f: F) -> (http::StatusCode, Value)
114where
115 F: FnOnce(Arc<HealthChecker>) -> Fut,
116 Fut: Future<Output = Vec<Result<(), HealthCheckError>>>,
117{
118 let result: Result<Vec<()>, _> = f(checker).await.into_iter().collect();
119
120 match result {
121 Ok(_) => (http::StatusCode::OK, json!({ "success": true})),
122 Err(_) => (
123 http::StatusCode::SERVICE_UNAVAILABLE,
124 json!({"success": false}),
125 ),
126 }
127}