wire_framework/health_check/
mod.rs1use std::{
2 collections::HashMap,
3 fmt,
4 sync::{Arc, Mutex},
5 thread,
6 time::Duration,
7};
8
9pub use async_trait::async_trait;
11use futures::future;
12use serde::Serialize;
13use tokio::sync::watch;
14
15mod metrics;
16use metrics::{AppHealthCheckConfig, CheckResult, METRICS};
17
18pub mod wire;
19
20#[cfg(test)]
21mod tests;
22
23#[derive(Debug, Clone, Copy, PartialEq, Serialize)]
25#[serde(rename_all = "snake_case")]
26#[non_exhaustive]
27pub enum HealthStatus {
28 NotReady,
30 Ready,
32 Affected,
34 ShuttingDown,
37 ShutDown,
39 Panicked,
41}
42
43impl HealthStatus {
44 pub fn is_healthy(self) -> bool {
46 matches!(self, Self::Ready | Self::Affected)
47 }
48
49 fn priority_for_aggregation(self) -> usize {
50 match self {
51 Self::Ready => 0,
52 Self::Affected => 1,
53 Self::ShuttingDown => 2,
54 Self::ShutDown => 3,
55 Self::NotReady => 4,
56 Self::Panicked => 5,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, PartialEq)]
63pub struct Health {
64 status: HealthStatus,
65 #[serde(skip_serializing_if = "Option::is_none")]
67 details: Option<serde_json::Value>,
68}
69
70impl Health {
71 #[must_use]
73 pub fn with_details<T: Serialize>(mut self, details: T) -> Self {
74 let details = serde_json::to_value(details).expect("Failed serializing `Health` details");
75 self.details = Some(details);
76 self
77 }
78
79 pub fn status(&self) -> HealthStatus {
81 self.status
82 }
83
84 pub fn details(&self) -> Option<&serde_json::Value> {
86 self.details.as_ref()
87 }
88}
89
90impl From<HealthStatus> for Health {
91 fn from(status: HealthStatus) -> Self {
92 Self {
93 status,
94 details: None,
95 }
96 }
97}
98
99#[derive(Debug, thiserror::Error)]
100#[non_exhaustive]
101pub enum AppHealthCheckError {
102 #[error("cannot insert health check for component `{0}`: it is redefined")]
104 RedefinedComponent(&'static str),
105}
106
107#[derive(Debug)]
109pub struct AppHealthCheck {
110 inner: Mutex<AppHealthCheckInner>,
111}
112
113#[derive(Debug, Clone)]
114struct AppHealthCheckInner {
115 app_details: Option<serde_json::Value>,
117 components: Vec<Arc<dyn CheckHealth>>,
118 slow_time_limit: Duration,
119 hard_time_limit: Duration,
120}
121
122impl Default for AppHealthCheck {
123 fn default() -> Self {
124 Self::new(None, None)
125 }
126}
127
128impl AppHealthCheck {
129 const DEFAULT_SLOW_TIME_LIMIT: Duration = Duration::from_millis(500);
130 const DEFAULT_HARD_TIME_LIMIT: Duration = Duration::from_secs(3);
131
132 pub fn new(slow_time_limit: Option<Duration>, hard_time_limit: Option<Duration>) -> Self {
133 let slow_time_limit = slow_time_limit.unwrap_or(Self::DEFAULT_SLOW_TIME_LIMIT);
134 let hard_time_limit = hard_time_limit.unwrap_or(Self::DEFAULT_HARD_TIME_LIMIT);
135 tracing::debug!(
136 "Created app health with time limits: slow={slow_time_limit:?}, hard={hard_time_limit:?}"
137 );
138
139 let inner = AppHealthCheckInner {
140 components: Vec::default(),
141 app_details: None,
142 slow_time_limit,
143 hard_time_limit,
144 };
145 Self {
146 inner: Mutex::new(inner),
147 }
148 }
149
150 pub fn override_limits(
151 &self,
152 slow_time_limit: Option<Duration>,
153 hard_time_limit: Option<Duration>,
154 ) {
155 let mut guard = self.inner.lock().expect("`AppHealthCheck` is poisoned");
156 if let Some(slow_time_limit) = slow_time_limit {
157 guard.slow_time_limit = slow_time_limit;
158 }
159 if let Some(hard_time_limit) = hard_time_limit {
160 guard.hard_time_limit = hard_time_limit;
161 }
162 tracing::debug!(
163 "Overridden app health time limits: slow={:?}, hard={:?}",
164 guard.slow_time_limit,
165 guard.hard_time_limit
166 );
167 }
168
169 pub fn expose_metrics(&self) {
172 let config = {
173 let inner = self.inner.lock().expect("`AppHealthCheck` is poisoned");
174 AppHealthCheckConfig {
175 slow_time_limit: inner.slow_time_limit.into(),
176 hard_time_limit: inner.hard_time_limit.into(),
177 }
178 };
179 if METRICS.info.set(config).is_err() {
180 tracing::warn!(
181 "App health redefined; previous config: {:?}",
182 METRICS.info.get()
183 );
184 }
185 }
186
187 pub fn set_details(&self, details: impl Serialize) {
189 let details = serde_json::to_value(details).expect("failed serializing app details");
190 let mut inner = self.inner.lock().expect("`AppHealthCheck` is poisoned");
191 inner.app_details = Some(details);
192 }
193
194 pub fn insert_component(
200 &self,
201 health_check: ReactiveHealthCheck,
202 ) -> Result<(), AppHealthCheckError> {
203 self.insert_custom_component(Arc::new(health_check))
204 }
205
206 pub fn insert_custom_component(
212 &self,
213 health_check: Arc<dyn CheckHealth>,
214 ) -> Result<(), AppHealthCheckError> {
215 let health_check_name = health_check.name();
216 let mut guard = self.inner.lock().expect("`AppHealthCheck` is poisoned");
217 if guard
218 .components
219 .iter()
220 .any(|check| check.name() == health_check_name)
221 {
222 return Err(AppHealthCheckError::RedefinedComponent(health_check_name));
223 }
224 guard.components.push(health_check);
225 Ok(())
226 }
227
228 pub async fn check_health(&self) -> AppHealth {
230 let AppHealthCheckInner {
232 components,
233 app_details,
234 slow_time_limit,
235 hard_time_limit,
236 } = self
237 .inner
238 .lock()
239 .expect("`AppHealthCheck` is poisoned")
240 .clone();
241
242 let check_futures = components.iter().map(|check| {
243 Self::check_health_with_time_limit(check.as_ref(), slow_time_limit, hard_time_limit)
244 });
245 let components: HashMap<_, _> = future::join_all(check_futures).await.into_iter().collect();
246
247 let aggregated_status = components
248 .values()
249 .map(|health| health.status)
250 .max_by_key(|status| status.priority_for_aggregation())
251 .unwrap_or(HealthStatus::Ready);
252 let mut inner = Health::from(aggregated_status);
253 inner.details = app_details.clone();
254
255 let health = AppHealth { inner, components };
256 if !health.inner.status.is_healthy() {
257 tracing::debug!("Aggregated application health: {health:?}");
259 }
260 health
261 }
262
263 async fn check_health_with_time_limit(
264 check: &dyn CheckHealth,
265 slow_time_limit: Duration,
266 hard_time_limit: Duration,
267 ) -> (&'static str, Health) {
268 struct DropGuard {
269 check_name: &'static str,
270 started_at: tokio::time::Instant,
271 hard_time_limit: Duration,
272 is_armed: bool,
273 }
274
275 impl Drop for DropGuard {
276 fn drop(&mut self) {
277 if !self.is_armed {
278 return;
279 }
280
281 let elapsed = self.started_at.elapsed();
282 let &mut Self {
283 check_name,
284 hard_time_limit,
285 ..
286 } = self;
287 tracing::warn!(
288 "Health check `{check_name}` was dropped before completion after {elapsed:?}; \
289 check the configured check timeout ({hard_time_limit:?}) and health check logic"
290 );
291 METRICS.observe_abnormal_check(check_name, CheckResult::Dropped, elapsed);
292 }
293 }
294
295 let check_name = check.name();
296 let started_at = tokio::time::Instant::now();
297 let mut drop_guard = DropGuard {
298 check_name,
299 started_at,
300 hard_time_limit,
301 is_armed: true,
302 };
303 let timeout_at = started_at + hard_time_limit;
304
305 let result = tokio::time::timeout_at(timeout_at, check.check_health()).await;
306 drop_guard.is_armed = false;
307 let elapsed = started_at.elapsed();
308 match result {
309 Ok(output) => {
310 if elapsed > slow_time_limit {
311 tracing::info!(
312 "Health check `{check_name}` took >{slow_time_limit:?} to complete: {elapsed:?}"
313 );
314 METRICS.observe_abnormal_check(check_name, CheckResult::Slow, elapsed);
315 }
316 (check_name, output)
317 }
318 Err(_) => {
319 tracing::warn!(
320 "Health check `{check_name}` timed out, taking >{hard_time_limit:?} to complete; marking as not ready"
321 );
322 METRICS.observe_abnormal_check(check_name, CheckResult::TimedOut, elapsed);
323 (check_name, HealthStatus::NotReady.into())
324 }
325 }
326 }
327}
328
329#[derive(Debug, Serialize)]
331pub struct AppHealth {
332 #[serde(flatten)]
333 inner: Health,
334 components: HashMap<&'static str, Health>,
335}
336
337impl AppHealth {
338 pub fn is_healthy(&self) -> bool {
339 self.inner.status.is_healthy()
340 }
341
342 pub fn inner(&self) -> &Health {
344 &self.inner
345 }
346
347 pub fn components(&self) -> &HashMap<&'static str, Health> {
349 &self.components
350 }
351}
352
353#[async_trait]
355pub trait CheckHealth: Send + Sync + 'static {
356 fn name(&self) -> &'static str;
358 async fn check_health(&self) -> Health;
360}
361
362impl fmt::Debug for dyn CheckHealth {
363 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
364 formatter
365 .debug_struct("CheckHealth")
366 .field("name", &self.name())
367 .finish()
368 }
369}
370
371#[async_trait]
372impl<T: CheckHealth + ?Sized> CheckHealth for Arc<T> {
373 fn name(&self) -> &'static str {
374 (**self).name()
375 }
376
377 async fn check_health(&self) -> Health {
378 (**self).check_health().await
379 }
380}
381
382#[derive(Debug, Clone)]
384pub struct ReactiveHealthCheck {
385 name: &'static str,
386 health_receiver: watch::Receiver<Health>,
387}
388
389impl ReactiveHealthCheck {
390 pub fn new(name: &'static str) -> (Self, HealthUpdater) {
393 let (health_sender, health_receiver) = watch::channel(HealthStatus::NotReady.into());
394 let this = Self {
395 name,
396 health_receiver,
397 };
398 let updater = HealthUpdater {
399 name,
400 should_track_drop: true,
401 health_sender,
402 };
403 (this, updater)
404 }
405
406 pub async fn wait_for(&mut self, condition: impl FnMut(&Health) -> bool) -> Health {
411 match self.health_receiver.wait_for(condition).await {
412 Ok(health) => health.clone(),
413 Err(_) => future::pending().await,
414 }
415 }
416}
417
418#[async_trait]
419impl CheckHealth for ReactiveHealthCheck {
420 fn name(&self) -> &'static str {
421 self.name
422 }
423
424 async fn check_health(&self) -> Health {
425 self.health_receiver.borrow().clone()
426 }
427}
428
429#[derive(Debug)]
434pub struct HealthUpdater {
435 name: &'static str,
436 should_track_drop: bool,
437 health_sender: watch::Sender<Health>,
438}
439
440impl HealthUpdater {
441 pub fn update(&self, health: Health) -> bool {
445 let old_health = self.health_sender.send_replace(health.clone());
446 if old_health != health {
447 tracing::debug!(
448 "Changed health of `{}` from {} to {}",
449 self.name,
450 serde_json::to_string(&old_health).unwrap_or_else(|_| format!("{old_health:?}")),
451 serde_json::to_string(&health).unwrap_or_else(|_| format!("{health:?}"))
452 );
453 return true;
454 }
455 false
456 }
457
458 pub fn freeze(mut self) {
460 self.should_track_drop = false;
461 }
462
463 pub fn subscribe(&self) -> ReactiveHealthCheck {
466 ReactiveHealthCheck {
467 name: self.name,
468 health_receiver: self.health_sender.subscribe(),
469 }
470 }
471}
472
473impl Drop for HealthUpdater {
474 fn drop(&mut self) {
475 if !self.should_track_drop {
476 return;
477 }
478
479 let terminal_health = if thread::panicking() {
480 HealthStatus::Panicked
481 } else {
482 HealthStatus::ShutDown
483 };
484 self.update(terminal_health.into());
485 }
486}