use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::time;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Unhealthy(String),
Degraded(String),
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub status: HealthStatus,
pub latency: Duration,
pub details: String,
pub recommendations: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct PoolHealthMetrics {
pub total_connections: Arc<AtomicUsize>,
pub active_connections: Arc<AtomicUsize>,
pub idle_connections: Arc<AtomicUsize>,
pub waiting_requests: Arc<AtomicUsize>,
pub connections_created: Arc<AtomicU64>,
pub connections_failed: Arc<AtomicU64>,
pub connections_closed: Arc<AtomicU64>,
pub last_health_check: Arc<RwLock<Instant>>,
pub last_successful_connection: Arc<RwLock<Instant>>,
}
impl Default for PoolHealthMetrics {
fn default() -> Self {
Self {
total_connections: Arc::new(AtomicUsize::new(0)),
active_connections: Arc::new(AtomicUsize::new(0)),
idle_connections: Arc::new(AtomicUsize::new(0)),
waiting_requests: Arc::new(AtomicUsize::new(0)),
connections_created: Arc::new(AtomicU64::new(0)),
connections_failed: Arc::new(AtomicU64::new(0)),
connections_closed: Arc::new(AtomicU64::new(0)),
last_health_check: Arc::new(RwLock::new(Instant::now())),
last_successful_connection: Arc::new(RwLock::new(Instant::now())),
}
}
}
impl PoolHealthMetrics {
pub fn new() -> Self {
Self {
total_connections: Arc::new(AtomicUsize::new(0)),
active_connections: Arc::new(AtomicUsize::new(0)),
idle_connections: Arc::new(AtomicUsize::new(0)),
waiting_requests: Arc::new(AtomicUsize::new(0)),
connections_created: Arc::new(AtomicU64::new(0)),
connections_failed: Arc::new(AtomicU64::new(0)),
connections_closed: Arc::new(AtomicU64::new(0)),
last_health_check: Arc::new(RwLock::new(Instant::now())),
last_successful_connection: Arc::new(RwLock::new(Instant::now())),
}
}
pub fn snapshot(&self) -> PoolSnapshot {
PoolSnapshot {
total: self.total_connections.load(Ordering::Relaxed),
active: self.active_connections.load(Ordering::Relaxed),
idle: self.idle_connections.load(Ordering::Relaxed),
waiting: self.waiting_requests.load(Ordering::Relaxed),
created: self.connections_created.load(Ordering::Relaxed),
failed: self.connections_failed.load(Ordering::Relaxed),
closed: self.connections_closed.load(Ordering::Relaxed),
}
}
pub fn is_healthy(&self) -> bool {
let snapshot = self.snapshot();
snapshot.idle > 0 || snapshot.active < snapshot.total
}
pub fn should_create_connection(&self, min_connections: usize) -> bool {
let snapshot = self.snapshot();
snapshot.total < min_connections || (snapshot.active >= snapshot.total && snapshot.idle == 0)
}
pub fn increment_active(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_active(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
pub async fn increment_idle(&self) {
self.idle_connections.fetch_add(1, Ordering::Relaxed);
*self.last_successful_connection.write().await = Instant::now();
}
pub fn decrement_idle(&self) {
self.idle_connections.fetch_sub(1, Ordering::Relaxed);
}
pub fn record_connection_created(&self) {
self.total_connections.fetch_add(1, Ordering::Relaxed);
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.connections_created.fetch_add(1, Ordering::Relaxed);
}
pub fn record_connection_failed(&self) {
self.connections_failed.fetch_add(1, Ordering::Relaxed);
}
pub fn record_connection_closed(&self) {
self.total_connections.fetch_sub(1, Ordering::Relaxed);
self.connections_closed.fetch_add(1, Ordering::Relaxed);
}
pub fn set_waiting_requests(&self, count: usize) {
self.waiting_requests.store(count, Ordering::Relaxed);
}
pub async fn update_health_check_time(&self) {
*self.last_health_check.write().await = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct PoolSnapshot {
pub total: usize,
pub active: usize,
pub idle: usize,
pub waiting: usize,
pub created: u64,
pub failed: u64,
pub closed: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitBreakerState {
Closed,
HalfOpen,
Open,
}
impl std::fmt::Display for CircuitBreakerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CircuitBreakerState::Closed => write!(f, "closed"),
CircuitBreakerState::HalfOpen => write!(f, "half-open"),
CircuitBreakerState::Open => write!(f, "open"),
}
}
}
#[derive(Debug, Error, Clone)]
#[error("Circuit breaker is {state}")]
pub struct CircuitBreakerError {
state: CircuitBreakerState,
}
impl CircuitBreakerError {
pub fn new(state: CircuitBreakerState) -> Self {
Self { state }
}
pub fn state(&self) -> CircuitBreakerState {
self.state
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
pub failure_threshold: u64,
pub success_threshold: u64,
pub timeout_ms: u64,
pub window_size: usize,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
success_threshold: 3,
timeout_ms: 30000, window_size: 100,
}
}
}
#[derive(Debug)]
pub struct CircuitBreaker {
state: Arc<RwLock<CircuitBreakerState>>,
consecutive_failures: Arc<AtomicU64>,
consecutive_successes: Arc<AtomicU64>,
last_state_change: Arc<RwLock<Instant>>,
config: Arc<CircuitBreakerConfig>,
failure_window: Arc<RwLock<Vec<bool>>>,
}
impl CircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
Self {
state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
consecutive_failures: Arc::new(AtomicU64::new(0)),
consecutive_successes: Arc::new(AtomicU64::new(0)),
last_state_change: Arc::new(RwLock::new(Instant::now())),
config: Arc::new(config),
failure_window: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn state(&self) -> CircuitBreakerState {
*self.state.read().await
}
pub async fn record_success(&self) {
let mut state = self.state.write().await;
let config = &self.config;
match *state {
CircuitBreakerState::Closed => {
self.consecutive_failures.store(0, Ordering::Relaxed);
self.consecutive_successes.fetch_add(1, Ordering::Relaxed);
}
CircuitBreakerState::HalfOpen => {
let successes = self.consecutive_successes.fetch_add(1, Ordering::Relaxed) + 1;
if successes >= config.success_threshold {
*state = CircuitBreakerState::Closed;
*self.last_state_change.write().await = Instant::now();
self.consecutive_failures.store(0, Ordering::Relaxed);
}
}
CircuitBreakerState::Open => {
}
}
self.update_failure_window(false).await;
}
pub async fn record_failure(&self) {
let mut state = self.state.write().await;
let config = &self.config;
match *state {
CircuitBreakerState::Closed => {
let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
if failures >= config.failure_threshold {
*state = CircuitBreakerState::Open;
*self.last_state_change.write().await = Instant::now();
}
self.consecutive_successes.store(0, Ordering::Relaxed);
}
CircuitBreakerState::HalfOpen => {
*state = CircuitBreakerState::Open;
*self.last_state_change.write().await = Instant::now();
self.consecutive_successes.store(0, Ordering::Relaxed);
}
CircuitBreakerState::Open => {
}
}
self.update_failure_window(true).await;
}
pub async fn can_execute(&self) -> Result<(), CircuitBreakerError> {
let state = self.state.read().await;
let config = &self.config;
match *state {
CircuitBreakerState::Closed => Ok(()),
CircuitBreakerState::HalfOpen => {
let failures = self.failure_window.read().await.iter().filter(|&&f| f).count();
let total = self.failure_window.read().await.len();
if total > 0 {
let failure_rate = failures as f64 / total as f64;
if failure_rate > 0.5 {
return Err(CircuitBreakerError::new(CircuitBreakerState::Open));
}
}
Ok(())
}
CircuitBreakerState::Open => {
let elapsed = self.last_state_change.read().await.elapsed();
if elapsed.as_millis() >= config.timeout_ms as u128 {
drop(state);
let mut write_state = self.state.write().await;
*write_state = CircuitBreakerState::HalfOpen;
*self.last_state_change.write().await = Instant::now();
self.consecutive_failures.store(0, Ordering::Relaxed);
self.consecutive_successes.store(0, Ordering::Relaxed);
Ok(())
} else {
Err(CircuitBreakerError::new(CircuitBreakerState::Open))
}
}
}
}
async fn update_failure_window(&self, is_failure: bool) {
let mut window = self.failure_window.write().await;
window.push(is_failure);
if window.len() > self.config.window_size {
window.remove(0);
}
}
pub async fn status(&self) -> CircuitBreakerStatus {
let state = self.state.read().await;
let elapsed = self.last_state_change.read().await.elapsed();
CircuitBreakerStatus {
state: *state,
consecutive_failures: self.consecutive_failures.load(Ordering::Relaxed),
consecutive_successes: self.consecutive_successes.load(Ordering::Relaxed),
time_since_last_change: elapsed,
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitBreakerStatus {
pub state: CircuitBreakerState,
pub consecutive_failures: u64,
pub consecutive_successes: u64,
pub time_since_last_change: Duration,
}
#[derive(Debug)]
pub struct HealthChecker {
metrics: PoolHealthMetrics,
circuit_breaker: CircuitBreaker,
check_timeout: Duration,
}
impl HealthChecker {
pub fn new(check_timeout_ms: u64) -> Self {
Self {
metrics: PoolHealthMetrics::new(),
circuit_breaker: CircuitBreaker::new(CircuitBreakerConfig::default()),
check_timeout: Duration::from_millis(check_timeout_ms),
}
}
pub async fn check(&self) -> HealthCheckResult {
let check_future = self.perform_check();
let result = tokio::time::timeout(self.check_timeout, check_future).await;
match result {
Ok(check_result) => {
self.metrics.update_health_check_time().await;
check_result
}
Err(_) => {
HealthCheckResult {
status: HealthStatus::Degraded("健康检查超时".to_string()),
latency: self.check_timeout,
details: "检查超时".to_string(),
recommendations: vec!["健康检查超时,建议优化检查逻辑".to_string()],
}
}
}
}
async fn perform_check(&self) -> HealthCheckResult {
let start = Instant::now();
let mut recommendations = Vec::new();
let cb_status = self.circuit_breaker.status().await;
let snapshot = self.metrics.snapshot();
let status = if snapshot.failed > snapshot.created.saturating_sub(10) && snapshot.created > 10 {
recommendations.push("检查数据库连接配置是否正确".to_string());
recommendations.push("验证网络连接是否稳定".to_string());
HealthStatus::Unhealthy(format!(
"连接失败率过高: {}/{} ({:.1}%)",
snapshot.failed,
snapshot.created,
snapshot.created as f64 / (snapshot.created + snapshot.failed) as f64 * 100.0
))
} else if snapshot.total == 0 {
HealthStatus::Unhealthy("无可用连接".to_string())
} else if cb_status.state == CircuitBreakerState::Open {
recommendations.push("等待熔断器恢复".to_string());
HealthStatus::Degraded("熔断器已打开,请求被拒绝".to_string())
} else if snapshot.active >= snapshot.total && snapshot.waiting > 10 {
recommendations.push("考虑增加连接池大小".to_string());
recommendations.push("检查是否有连接泄露".to_string());
HealthStatus::Degraded(format!(
"连接池已满: {}/{} 活跃, {} 等待",
snapshot.active, snapshot.total, snapshot.waiting
))
} else if snapshot.idle == 0 && snapshot.active > 0 {
recommendations.push("考虑增加最小连接数".to_string());
HealthStatus::Degraded("无空闲连接".to_string())
} else {
HealthStatus::Healthy
};
self.metrics.update_health_check_time().await;
let details = format!(
"连接池: total={}, active={}, idle={}, waiting={}\n\
统计: created={}, failed={}, closed={}\n\
熔断器: {:?} ({} 次失败, {} 次成功)",
snapshot.total,
snapshot.active,
snapshot.idle,
snapshot.waiting,
snapshot.created,
snapshot.failed,
snapshot.closed,
cb_status.state,
cb_status.consecutive_failures,
cb_status.consecutive_successes
);
HealthCheckResult {
status,
latency: start.elapsed(),
details,
recommendations,
}
}
pub fn metrics(&self) -> &PoolHealthMetrics {
&self.metrics
}
pub fn circuit_breaker(&self) -> &CircuitBreaker {
&self.circuit_breaker
}
}
#[derive(Debug)]
pub struct AutoRecoverer {
health_checker: Arc<HealthChecker>,
check_interval: Duration,
running: Arc<AtomicU64>,
}
impl AutoRecoverer {
pub fn new(health_checker: Arc<HealthChecker>, check_interval_ms: u64) -> Self {
Self {
health_checker,
check_interval: Duration::from_millis(check_interval_ms),
running: Arc::new(AtomicU64::new(0)),
}
}
pub async fn start(&self) {
if self
.running
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return; }
let mut interval = time::interval(self.check_interval);
loop {
interval.tick().await;
if let Ok(()) = self.health_checker.circuit_breaker().can_execute().await {
let _ = self.health_checker.circuit_breaker().record_success().await;
} else {
let health_result = self.health_checker.check().await;
if matches!(health_result.status, HealthStatus::Healthy) {
}
}
}
}
pub fn stop(&self) {
self.running.store(0, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_circuit_breaker_closed_to_open() {
let breaker = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 3,
success_threshold: 2,
timeout_ms: 1000,
window_size: 10,
});
assert_eq!(breaker.state().await, CircuitBreakerState::Closed);
for _ in 0..3 {
breaker.record_failure().await;
}
assert_eq!(breaker.state().await, CircuitBreakerState::Open);
}
#[tokio::test]
async fn test_circuit_breaker_half_open_recovery() {
let breaker = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 2,
success_threshold: 2,
timeout_ms: 100,
window_size: 10,
});
breaker.record_failure().await;
breaker.record_failure().await;
assert_eq!(breaker.state().await, CircuitBreakerState::Open);
tokio::time::sleep(Duration::from_millis(150)).await;
let _ = breaker.can_execute().await;
assert_eq!(breaker.state().await, CircuitBreakerState::HalfOpen);
breaker.record_success().await;
breaker.record_success().await;
assert_eq!(breaker.state().await, CircuitBreakerState::Closed);
}
#[tokio::test]
async fn test_health_metrics() {
let metrics = PoolHealthMetrics::new();
let snapshot = metrics.snapshot();
assert_eq!(snapshot.total, 0);
assert_eq!(snapshot.active, 0);
assert_eq!(snapshot.idle, 0);
metrics.record_connection_created();
let snapshot = metrics.snapshot();
assert_eq!(snapshot.total, 1);
assert_eq!(snapshot.active, 1);
metrics.increment_active();
let snapshot = metrics.snapshot();
assert_eq!(snapshot.active, 2);
metrics.decrement_active();
metrics.increment_idle().await;
let snapshot = metrics.snapshot();
assert_eq!(snapshot.active, 1);
assert_eq!(snapshot.idle, 1);
metrics.record_connection_closed();
let snapshot = metrics.snapshot();
assert_eq!(snapshot.total, 0);
}
#[tokio::test]
async fn test_health_checker() {
let checker = HealthChecker::new(1000);
let result = checker.check().await;
assert!(result.latency < Duration::from_secs(1));
assert!(
!result.recommendations.is_empty()
|| matches!(result.status, HealthStatus::Healthy)
|| matches!(result.status, HealthStatus::Unhealthy(_))
);
}
}