use crate::utils::config::OpenCratesConfig;
use crate::utils::metrics::MetricRegistry;
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::{Mutex, RwLock};
use tokio::time::timeout;
use tracing::{debug, error, info, warn};
const DEFAULT_HEALTH_CHECK_INTERVAL: u64 = 30; const DEFAULT_SERVICE_TIMEOUT: u64 = 5;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
Warning,
Critical,
}
impl fmt::Display for HealthStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl HealthStatus {
#[must_use]
pub fn combine(&self, other: &HealthStatus) -> HealthStatus {
match (self, other) {
(HealthStatus::Unhealthy, _) | (_, HealthStatus::Unhealthy) => HealthStatus::Unhealthy,
(HealthStatus::Critical, _) | (_, HealthStatus::Critical) => HealthStatus::Critical,
(HealthStatus::Degraded, _) | (_, HealthStatus::Degraded) => HealthStatus::Degraded,
(HealthStatus::Warning, _) | (_, HealthStatus::Warning) => HealthStatus::Warning,
(HealthStatus::Healthy, HealthStatus::Healthy) => HealthStatus::Healthy,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckResult {
pub name: String,
pub status: HealthStatus,
pub message: Option<String>,
pub duration: Duration,
}
impl CheckResult {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self.status {
HealthStatus::Healthy => "Healthy",
HealthStatus::Unhealthy => "Unhealthy",
HealthStatus::Degraded => "Degraded",
HealthStatus::Critical => "Critical",
HealthStatus::Warning => "Warning",
}
}
}
#[async_trait::async_trait]
pub trait HealthCheck: Send + Sync {
fn name(&self) -> &str;
async fn check(&self) -> CheckResult;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthInfo {
pub overall_status: HealthStatus,
pub checks: Vec<CheckResult>,
pub uptime: Duration,
}
impl HealthInfo {
#[must_use]
pub fn healthy() -> Self {
Self {
overall_status: HealthStatus::Healthy,
checks: Vec::new(),
uptime: Duration::from_secs(0),
}
}
}
pub struct HealthManager {
checks: Vec<Box<dyn HealthCheck + Send + Sync>>,
statuses: tokio::sync::RwLock<HealthInfo>,
metrics: MetricRegistry,
check_interval: Duration,
timeout: Duration,
start_time: Instant,
}
impl HealthManager {
pub async fn new() -> anyhow::Result<Self> {
Ok(Self {
checks: Vec::new(),
statuses: tokio::sync::RwLock::new(HealthInfo::healthy()),
metrics: MetricRegistry::default(),
check_interval: Duration::from_secs(30),
timeout: Duration::from_secs(5),
start_time: Instant::now(),
})
}
pub async fn new_with_registry(registry: MetricRegistry) -> anyhow::Result<Self> {
Ok(Self {
checks: Vec::new(),
statuses: tokio::sync::RwLock::new(HealthInfo::healthy()),
metrics: registry,
check_interval: Duration::from_secs(30),
timeout: Duration::from_secs(5),
start_time: Instant::now(),
})
}
pub async fn new_default() -> anyhow::Result<Self> {
Self::new().await
}
pub async fn get_health_status(&self) -> HealthStatus {
self.statuses.read().await.overall_status.clone()
}
pub async fn get_health_info(&self) -> HealthInfo {
self.statuses.read().await.clone()
}
pub fn register_check(&mut self, check: Box<dyn HealthCheck + Send + Sync>) {
info!("Registering health check: {}", check.name());
self.checks.push(check);
}
pub async fn run_checks(&self) {
let mut results = Vec::new();
let mut overall_status = HealthStatus::Healthy;
let check_futures = self.checks.iter().map(|check| {
let start = Instant::now();
let timeout = self.timeout;
async move {
let check_future = check.check();
match tokio::time::timeout(timeout, check_future).await {
Ok(mut result) => {
result.duration = start.elapsed();
result
}
Err(_) => {
CheckResult {
name: check.name().to_string(),
status: HealthStatus::Degraded,
message: Some(format!("Health check timed out after {timeout:?}")),
duration: start.elapsed(),
}
}
}
}
});
let check_results: Vec<_> = futures::future::join_all(check_futures).await;
for result in check_results {
match result.status {
HealthStatus::Healthy => {
}
HealthStatus::Degraded => {
if overall_status == HealthStatus::Healthy {
overall_status = HealthStatus::Degraded;
}
}
HealthStatus::Unhealthy => {
overall_status = HealthStatus::Unhealthy;
}
HealthStatus::Critical => {
overall_status = HealthStatus::Unhealthy;
}
HealthStatus::Warning => {
if overall_status == HealthStatus::Healthy {
overall_status = HealthStatus::Degraded;
}
}
}
results.push(result);
}
let mut statuses = self.statuses.write().await;
statuses.checks = results;
statuses.overall_status = overall_status.clone();
statuses.uptime = self.start_time.elapsed();
debug!(
"Health checks completed. Overall status: {:?}",
overall_status
);
}
pub fn start_periodic_checks(self: Arc<Self>) {
let self_clone = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(self_clone.check_interval);
loop {
interval.tick().await;
self_clone.run_checks().await;
}
});
}
}
pub struct HealthMiddleware {
manager: Arc<HealthManager>,
endpoint: String,
}
impl HealthMiddleware {
pub fn new(manager: Arc<HealthManager>, endpoint: String) -> Self {
Self { manager, endpoint }
}
pub async fn handle_request(&self, path: &str) -> Result<(u16, String)> {
if path != self.endpoint {
return Ok((404, "Not Found".to_string()));
}
let health = self.manager.get_health_status().await;
let status_code = match health {
HealthStatus::Healthy => 200,
HealthStatus::Degraded => 200, HealthStatus::Unhealthy => 503,
HealthStatus::Critical => 503,
HealthStatus::Warning => 200,
};
let response = serde_json::to_string_pretty(&health)
.unwrap_or_else(|_| "Internal Server Error".to_string());
Ok((status_code, response))
}
pub async fn handle_detailed_request(&self, check_name: Option<&str>) -> Result<(u16, String)> {
match check_name {
Some(name) => {
let health_info = self.manager.get_health_info().await;
let result = health_info.checks.iter().find(|r| r.name == *name);
if let Some(result) = result {
let status_code = match result.status {
HealthStatus::Healthy => 200,
HealthStatus::Degraded => 200,
HealthStatus::Unhealthy => 503,
HealthStatus::Critical => 503,
HealthStatus::Warning => 200,
};
let response = serde_json::to_string_pretty(result)
.unwrap_or_else(|_| "Internal Server Error".to_string());
Ok((status_code, response))
} else {
Ok((404, format!("Health check '{name}' not found")))
}
}
None => self.handle_request(&self.endpoint).await,
}
}
}
pub struct HealthCircuitBreaker {
failure_threshold: usize,
recovery_timeout: Duration,
failure_count: Arc<Mutex<usize>>,
last_failure: Arc<Mutex<Option<Instant>>>,
state: Arc<Mutex<CircuitBreakerState>>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
enum CircuitBreakerState {
Closed, Open, HalfOpen, }
impl HealthCircuitBreaker {
#[must_use]
pub fn new(failure_threshold: usize, recovery_timeout: Duration) -> Self {
Self {
failure_threshold,
recovery_timeout,
failure_count: Arc::new(Mutex::new(0)),
last_failure: Arc::new(Mutex::new(None)),
state: Arc::new(Mutex::new(CircuitBreakerState::Closed)),
}
}
pub async fn execute<F, Fut>(&self, check_fn: F) -> CheckResult
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = CheckResult>,
{
let state = {
let state_guard = self.state.lock().await;
*state_guard
};
match state {
CircuitBreakerState::Open => {
let should_try = {
let last_failure_guard = self.last_failure.lock().await;
if let Some(last_failure) = *last_failure_guard {
last_failure.elapsed() >= self.recovery_timeout
} else {
true
}
};
if should_try {
let mut state_guard = self.state.lock().await;
*state_guard = CircuitBreakerState::HalfOpen;
drop(state_guard);
self.execute_and_handle_result(check_fn).await
} else {
CheckResult {
name: "circuit_breaker".to_string(),
status: HealthStatus::Unhealthy,
message: Some("Circuit breaker is open".to_string()),
duration: Duration::from_millis(0),
}
}
}
CircuitBreakerState::HalfOpen | CircuitBreakerState::Closed => {
self.execute_and_handle_result(check_fn).await
}
}
}
async fn execute_and_handle_result<F, Fut>(&self, check_fn: F) -> CheckResult
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = CheckResult>,
{
let result = check_fn().await;
match result.status {
HealthStatus::Healthy => {
let mut failure_count_guard = self.failure_count.lock().await;
*failure_count_guard = 0;
drop(failure_count_guard);
let mut state_guard = self.state.lock().await;
*state_guard = CircuitBreakerState::Closed;
}
HealthStatus::Unhealthy | HealthStatus::Critical => {
let mut failure_count_guard = self.failure_count.lock().await;
*failure_count_guard += 1;
let failures = *failure_count_guard;
drop(failure_count_guard);
let mut last_failure_guard = self.last_failure.lock().await;
*last_failure_guard = Some(Instant::now());
drop(last_failure_guard);
if failures >= self.failure_threshold {
let mut state_guard = self.state.lock().await;
*state_guard = CircuitBreakerState::Open;
}
}
HealthStatus::Degraded | HealthStatus::Warning => {
}
}
result
}
}
pub struct HealthAggregator {
managers: Vec<Arc<HealthManager>>,
weights: HashMap<String, f64>,
}
impl Default for HealthAggregator {
fn default() -> Self {
Self::new()
}
}
impl HealthAggregator {
#[must_use]
pub fn new() -> Self {
Self {
managers: Vec::new(),
weights: HashMap::new(),
}
}
pub fn add_manager(&mut self, manager: Arc<HealthManager>, _weight: f64) {
self.managers.push(manager);
}
pub fn set_check_weight(&mut self, check_name: String, weight: f64) {
self.weights.insert(check_name, weight);
}
pub async fn get_aggregated_health(&self) -> HealthStatus {
let mut overall_status = HealthStatus::Healthy;
for manager in &self.managers {
let health = manager.get_health_status().await;
match health {
HealthStatus::Unhealthy => return HealthStatus::Unhealthy,
HealthStatus::Degraded => overall_status = HealthStatus::Degraded,
_ => {}
}
}
overall_status
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct SystemStatus {
pub data: Option<serde_json::Value>,
pub healthy: bool,
}
impl SystemStatus {
#[must_use]
pub fn healthy(data: serde_json::Value) -> Self {
Self {
data: Some(data),
healthy: true,
}
}
#[must_use]
pub fn unhealthy() -> Self {
Self {
data: None,
healthy: false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::time::{sleep, Duration as TokioDuration};
#[derive(Debug)]
struct MockHealthCheck {
name: String,
should_fail: Arc<AtomicBool>,
should_timeout: Arc<AtomicBool>,
dependencies: Vec<String>,
}
impl MockHealthCheck {
fn new(name: String) -> Self {
Self {
name,
should_fail: Arc::new(AtomicBool::new(false)),
should_timeout: Arc::new(AtomicBool::new(false)),
dependencies: Vec::new(),
}
}
fn with_dependencies(mut self, dependencies: Vec<String>) -> Self {
self.dependencies = dependencies;
self
}
fn set_should_fail(self: Arc<Self>, should_fail: bool) -> Arc<Self> {
self.should_fail.store(should_fail, Ordering::Relaxed);
self
}
fn set_should_timeout(self: Arc<Self>, should_timeout: bool) -> Arc<Self> {
self.should_timeout.store(should_timeout, Ordering::Relaxed);
self
}
}
#[async_trait]
impl HealthCheck for MockHealthCheck {
fn name(&self) -> &str {
&self.name
}
async fn check(&self) -> CheckResult {
if self.should_timeout.load(Ordering::Relaxed) {
sleep(TokioDuration::from_secs(10)).await;
return CheckResult {
name: self.name.clone(),
status: HealthStatus::Healthy,
message: Some("Mock check (should have timed out)".to_string()),
duration: TokioDuration::from_secs(10),
};
}
if self.should_fail.load(Ordering::Relaxed) {
CheckResult {
name: self.name.clone(),
status: HealthStatus::Unhealthy,
message: Some("Mock failure".to_string()),
duration: TokioDuration::from_millis(10),
}
} else {
CheckResult {
name: self.name.clone(),
status: HealthStatus::Healthy,
message: Some("Mock check successful".to_string()),
duration: TokioDuration::from_millis(10),
}
}
}
}
#[async_trait]
impl HealthCheck for Arc<MockHealthCheck> {
fn name(&self) -> &str {
&self.name
}
async fn check(&self) -> CheckResult {
self.as_ref().check().await
}
}
#[tokio::test]
async fn test_health_status_combine() {
assert_eq!(
HealthStatus::Healthy.combine(&HealthStatus::Healthy),
HealthStatus::Healthy
);
assert_eq!(
HealthStatus::Healthy.combine(&HealthStatus::Degraded),
HealthStatus::Degraded
);
assert_eq!(
HealthStatus::Degraded.combine(&HealthStatus::Unhealthy),
HealthStatus::Unhealthy
);
assert_eq!(
HealthStatus::Healthy.combine(&HealthStatus::Degraded),
HealthStatus::Degraded
);
}
#[tokio::test]
async fn test_health_check_result_creation() {
let result = CheckResult {
name: "test".to_string(),
status: HealthStatus::Healthy,
message: None,
duration: TokioDuration::from_millis(50),
};
assert_eq!(result.status, HealthStatus::Healthy);
assert_eq!(result.duration, TokioDuration::from_millis(50));
assert!(result.message.is_none());
}
#[tokio::test]
async fn test_mock_health_check() {
let check = Arc::new(MockHealthCheck::new("test".to_string())).set_should_fail(true);
let result = check.check().await;
assert_eq!(result.status, HealthStatus::Unhealthy);
assert!(result.message.as_ref().unwrap().contains("Mock failure"));
}
#[tokio::test]
async fn test_health_manager_basic() {
let registry = crate::utils::metrics::MetricRegistry::new();
let mut manager = HealthManager::new_with_registry(registry).await.unwrap();
let check = Arc::new(MockHealthCheck::new("test_check".to_string()));
manager.register_check(Box::new(check));
manager.run_checks().await;
let health_status = manager.get_health_status().await;
assert_eq!(health_status, HealthStatus::Healthy);
}
#[tokio::test]
async fn test_health_manager_with_failure() {
let registry = crate::utils::metrics::MetricRegistry::new();
let mut manager = HealthManager::new_with_registry(registry).await.unwrap();
let check =
Arc::new(MockHealthCheck::new("failing_check".to_string())).set_should_fail(true);
manager.register_check(Box::new(check));
manager.run_checks().await;
let health_status = manager.get_health_status().await;
assert_eq!(health_status, HealthStatus::Unhealthy);
}
#[tokio::test]
async fn test_health_manager_timeout() {
let registry = crate::utils::metrics::MetricRegistry::new();
let mut manager = HealthManager::new_with_registry(registry).await.unwrap();
let check =
Arc::new(MockHealthCheck::new("timeout_check".to_string())).set_should_timeout(true);
manager.register_check(Box::new(check));
manager.run_checks().await;
let health_status = manager.get_health_status().await;
assert_eq!(health_status, HealthStatus::Degraded); }
#[tokio::test]
async fn test_health_manager_dependencies() {
let registry = crate::utils::metrics::MetricRegistry::new();
let mut manager = HealthManager::new_with_registry(registry).await.unwrap();
let check_a = Arc::new(MockHealthCheck::new("check_a".to_string()));
let check_b = Arc::new(
MockHealthCheck::new("check_b".to_string())
.with_dependencies(vec!["check_a".to_string()]),
);
manager.register_check(Box::new(check_b));
manager.register_check(Box::new(check_a));
manager.run_checks().await;
let health_status = manager.get_health_status().await;
assert_eq!(health_status, HealthStatus::Healthy);
}
#[tokio::test]
async fn test_health_manager_periodic_checks() {
let registry = crate::utils::metrics::MetricRegistry::new();
let mut manager = HealthManager::new_with_registry(registry).await.unwrap();
let check = Arc::new(MockHealthCheck::new("periodic_check".to_string()));
manager.register_check(Box::new(check));
let manager = Arc::new(manager);
manager.clone().start_periodic_checks();
sleep(TokioDuration::from_millis(200)).await;
let health_status = manager.get_health_status().await;
assert_eq!(health_status, HealthStatus::Healthy);
}
#[tokio::test]
async fn test_health_info() {
let mut checks = Vec::new();
checks.push(CheckResult {
name: "healthy_check".to_string(),
status: HealthStatus::Healthy,
message: None,
duration: TokioDuration::from_millis(10),
});
checks.push(CheckResult {
name: "degraded_check".to_string(),
status: HealthStatus::Degraded,
message: Some("Warning".to_string()),
duration: TokioDuration::from_millis(20),
});
checks.push(CheckResult {
name: "unhealthy_check".to_string(),
status: HealthStatus::Unhealthy,
message: Some("Error".to_string()),
duration: TokioDuration::from_millis(30),
});
let health_info = HealthInfo {
overall_status: HealthStatus::Unhealthy,
checks,
uptime: TokioDuration::from_secs(100),
};
assert_eq!(health_info.overall_status, HealthStatus::Unhealthy);
assert_eq!(health_info.checks.len(), 3);
}
#[tokio::test]
async fn test_circuit_breaker() {
let circuit_breaker = HealthCircuitBreaker::new(2, TokioDuration::from_millis(100));
let result = circuit_breaker
.execute(|| async {
CheckResult {
name: "test_check".to_string(),
status: HealthStatus::Unhealthy,
message: Some("Failure 1".to_string()),
duration: TokioDuration::from_millis(10),
}
})
.await;
assert_eq!(result.status, HealthStatus::Unhealthy);
let result = circuit_breaker
.execute(|| async {
CheckResult {
name: "test_check".to_string(),
status: HealthStatus::Unhealthy,
message: Some("Failure 2".to_string()),
duration: TokioDuration::from_millis(10),
}
})
.await;
assert_eq!(result.status, HealthStatus::Unhealthy);
let result = circuit_breaker
.execute(|| async {
CheckResult {
name: "test_check".to_string(),
status: HealthStatus::Healthy,
message: None,
duration: TokioDuration::from_millis(10),
}
})
.await;
assert_eq!(result.status, HealthStatus::Unhealthy);
assert!(result
.message
.as_ref()
.unwrap()
.contains("Circuit breaker is open"));
sleep(TokioDuration::from_millis(150)).await;
let result = circuit_breaker
.execute(|| async {
CheckResult {
name: "test_check".to_string(),
status: HealthStatus::Healthy,
message: None,
duration: TokioDuration::from_millis(10),
}
})
.await;
assert_eq!(result.status, HealthStatus::Healthy);
}
#[tokio::test]
async fn test_health_aggregator() {
let registry1 = MetricRegistry::new();
let registry2 = MetricRegistry::new();
let mut manager1 = HealthManager::new_with_registry(registry1).await.unwrap();
let mut manager2 = HealthManager::new_with_registry(registry2).await.unwrap();
let check1 = Arc::new(MockHealthCheck::new("check1".to_string()));
let check2 = Arc::new(MockHealthCheck::new("check2".to_string()));
manager1.register_check(Box::new(check1));
manager2.register_check(Box::new(check2));
manager1.run_checks().await;
manager2.run_checks().await;
let mut aggregator = HealthAggregator::new();
aggregator.add_manager(Arc::new(manager1), 1.0);
aggregator.add_manager(Arc::new(manager2), 1.0);
let aggregated_health = aggregator.get_aggregated_health().await;
assert_eq!(aggregated_health, HealthStatus::Healthy);
}
}