#[cfg(feature = "wal-recovery")]
use crate::backend::l2::L2Backend;
#[cfg(feature = "wal-recovery")]
use crate::recovery::wal::WalEntry;
#[cfg(feature = "wal-recovery")]
use crate::recovery::wal::WalManager;
#[cfg(feature = "wal-recovery")]
use crate::recovery::wal::WalReplayableBackend;
#[cfg(feature = "wal-recovery")]
pub use crate::recovery::wal::WalReplayableBackend as WalReplayableBackendTrait;
#[cfg(feature = "wal-recovery")]
use std::sync::Arc;
#[cfg(feature = "wal-recovery")]
use std::time::{Duration, Instant};
#[cfg(feature = "wal-recovery")]
use tokio::sync::RwLock;
#[cfg(feature = "wal-recovery")]
use tokio::time::timeout;
#[cfg(feature = "wal-recovery")]
#[allow(async_fn_in_trait)]
pub trait HealthCheckableBackend: Clone + Send + Sync + 'static {
async fn ping(&self) -> crate::error::Result<()>;
fn command_timeout_ms(&self) -> u64;
}
#[cfg(feature = "wal-recovery")]
impl WalReplayableBackend for L2Backend {
async fn pipeline_replay(&self, entries: Vec<WalEntry>) -> crate::error::Result<()> {
L2Backend::pipeline_replay(self, entries).await
}
}
#[cfg(feature = "wal-recovery")]
impl HealthCheckableBackend for L2Backend {
async fn ping(&self) -> crate::error::Result<()> {
L2Backend::ping(self).await
}
fn command_timeout_ms(&self) -> u64 {
L2Backend::command_timeout_ms(self)
}
}
#[cfg(feature = "wal-recovery")]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum HealthState {
Healthy,
Degraded { since: Instant, failure_count: u32 },
Recovering { since: Instant, success_count: u32 },
WalReplaying { since: Instant },
}
#[cfg(feature = "wal-recovery")]
pub struct HealthChecker<T: HealthCheckableBackend> {
l2: Arc<T>,
state: Arc<RwLock<HealthState>>,
wal: Arc<WalManager>,
service_name: String,
command_timeout_ms: u64,
}
#[cfg(feature = "wal-recovery")]
impl<T: HealthCheckableBackend + WalReplayableBackend> HealthChecker<T> {
pub fn new(
l2: Arc<T>,
state: Arc<RwLock<HealthState>>,
wal: Arc<WalManager>,
service_name: String,
command_timeout_ms: u64,
) -> Self {
Self {
l2,
state,
wal,
service_name,
command_timeout_ms,
}
}
pub async fn start(self) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let is_healthy = match timeout(
Duration::from_millis(self.command_timeout_ms),
self.l2.ping(),
)
.await
{
Ok(Ok(())) => {
tracing::trace!("服务 {} ping成功", self.service_name);
true
}
Ok(Err(e)) => {
tracing::debug!("服务 {} ping失败: {}", self.service_name, e);
false
}
Err(_) => {
tracing::debug!(
"服务 {} ping超时 ({}ms)",
self.service_name,
self.command_timeout_ms
);
false
}
};
let current_state = *self.state.read().await;
tracing::debug!(
"服务 {} 健康检查: is_healthy={}, 当前状态={:?}, 即将获取写锁",
self.service_name,
is_healthy,
current_state
);
let mut state_guard = self.state.write().await;
tracing::debug!(
"服务 {} 获取写锁成功,当前状态={:?}",
self.service_name,
*state_guard
);
let new_state = match *state_guard {
HealthState::Healthy => {
if !is_healthy {
tracing::warn!("服务 {} L2已降级", self.service_name);
HealthState::Degraded {
since: Instant::now(),
failure_count: 1,
}
} else {
tracing::debug!("服务 {} 保持健康状态", self.service_name);
HealthState::Healthy
}
}
HealthState::Degraded {
since,
failure_count,
} => {
tracing::debug!(
"服务 {} Degraded状态检查: is_healthy={}, failure_count={}, since={:?}",
self.service_name,
is_healthy,
failure_count,
since
);
if is_healthy {
tracing::info!(
"服务 {} L2正在恢复 (failure_count={})",
self.service_name,
failure_count
);
tracing::debug!(
"服务 {} 状态转换: Degraded -> Recovering",
self.service_name
);
HealthState::Recovering {
since: Instant::now(),
success_count: 1,
}
} else if failure_count >= 3 {
tracing::debug!(
"服务 {} 保持降级状态 (failure_count={} >= 3)",
self.service_name,
failure_count
);
HealthState::Degraded {
since,
failure_count,
}
} else {
tracing::debug!(
"服务 {} 增加失败计数: {} -> {}",
self.service_name,
failure_count,
failure_count + 1
);
HealthState::Degraded {
since,
failure_count: failure_count + 1,
}
}
}
HealthState::Recovering {
since,
success_count,
} => {
if !is_healthy {
tracing::info!(
"服务 {} 恢复失败,回到降级状态 (success_count={})",
self.service_name,
success_count
);
HealthState::Degraded {
since: Instant::now(),
failure_count: 1,
}
} else if success_count >= 3 {
tracing::info!(
"服务 {} 达到恢复条件,开始重放WAL (success_count={})",
self.service_name,
success_count
);
tracing::debug!(
"服务 {} 状态转换: Recovering -> WalReplaying",
self.service_name
);
*state_guard = HealthState::WalReplaying {
since: Instant::now(),
};
drop(state_guard);
let replay_result = self.wal.replay_all(&self.l2).await;
state_guard = self.state.write().await;
match replay_result {
Ok(count) => {
tracing::info!(
"服务 {} WAL已重放: {} 条目,状态转换: WalReplaying -> Healthy",
self.service_name,
count
);
HealthState::Healthy
}
Err(e) => {
tracing::error!(
"服务 {} WAL重放失败: {},状态转换: WalReplaying -> Recovering",
self.service_name,
e
);
HealthState::Recovering {
since: Instant::now(),
success_count: 1, }
}
}
} else {
tracing::debug!(
"服务 {} 增加恢复计数: {} -> {}",
self.service_name,
success_count,
success_count + 1
);
HealthState::Recovering {
since,
success_count: success_count + 1,
}
}
}
HealthState::WalReplaying { .. } => {
tracing::debug!("服务 {} 仍处于 WalReplaying 状态", self.service_name);
HealthState::WalReplaying {
since: Instant::now(),
}
}
};
if *state_guard != new_state {
tracing::info!(
"服务 {} 健康状态变更: {:?} -> {:?}",
self.service_name,
*state_guard,
new_state
);
*state_guard = new_state;
let status_code = match new_state {
HealthState::Healthy => 1,
HealthState::Recovering { .. } => 2,
HealthState::WalReplaying { .. } => 2,
HealthState::Degraded { .. } => 0,
};
crate::metrics::GLOBAL_METRICS.set_health(&self.service_name, status_code);
} else {
tracing::debug!(
"服务 {} 健康状态未变更: {:?} (ping结果={})",
self.service_name,
*state_guard,
is_healthy
);
}
}
}
}
#[cfg(not(feature = "wal-recovery"))]
#[allow(async_fn_in_trait)]
pub trait HealthCheckableBackend: Clone + Send + Sync + 'static {}
#[cfg(not(feature = "wal-recovery"))]
impl<T: HealthCheckableBackend> HealthCheckableBackend for T {}
#[cfg(not(feature = "wal-recovery"))]
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum HealthState {
#[default]
Healthy,
Degraded {
since: std::time::Instant,
failure_count: u32,
},
Recovering {
since: std::time::Instant,
success_count: u32,
},
WalReplaying {
since: std::time::Instant,
},
}
#[cfg(not(feature = "wal-recovery"))]
#[derive(Debug)]
pub struct HealthChecker<T: HealthCheckableBackend> {
l2: Arc<T>,
state: Arc<tokio::sync::RwLock<HealthState>>,
wal: Arc<()>,
service_name: String,
command_timeout_ms: u64,
}
#[cfg(not(feature = "wal-recovery"))]
impl<T: HealthCheckableBackend> HealthChecker<T> {
pub fn new(
l2: Arc<T>,
state: Arc<tokio::sync::RwLock<HealthState>>,
_wal: Arc<()>,
service_name: String,
command_timeout_ms: u64,
) -> Self {
Self {
l2,
state,
wal: Arc::new(()),
service_name,
command_timeout_ms,
}
}
pub async fn start(self) {}
}