use fmt::{Display, Formatter};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::sync::{Mutex, MutexGuard};
use std::time::{Duration, Instant};
pub trait Clock: Send + Sync + 'static {
fn now(&self) -> Instant;
}
#[derive(Clone, Copy, Default)]
pub struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> Instant {
Instant::now()
}
}
struct HostRecord {
failures: Vec<Instant>,
offline_until: Option<Instant>,
}
impl Default for HostRecord {
fn default() -> Self {
Self {
failures: Vec::new(),
offline_until: None,
}
}
}
pub struct HostHealthTracker<C: Clock> {
clock: C,
failure_threshold: usize,
failure_window: Duration,
offline_duration: Duration,
inner: Mutex<HashMap<String, HostRecord>>,
}
impl<C: Clock> HostHealthTracker<C> {
pub fn new(
clock: C,
failure_threshold: usize,
failure_window: Duration,
offline_duration: Duration,
) -> Self {
Self {
clock,
failure_threshold,
failure_window,
offline_duration,
inner: Mutex::new(HashMap::new()),
}
}
fn prune_old(&self, record: &mut HostRecord, now: Instant) {
record
.failures
.retain(|&ts| now.duration_since(ts) <= self.failure_window);
}
pub fn offline_until(&self, host: &str) -> Option<Instant> {
let now: Instant = self.clock.now();
let mut guard: MutexGuard<'_, HashMap<String, HostRecord>> = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("HostHealthTracker mutex poisoned, recovering");
poisoned.into_inner()
}
};
if let Some(record) = guard.get_mut(host) {
if let Some(until) = record.offline_until {
if now >= until {
record.offline_until = None;
record.failures.clear();
return None;
}
return Some(until);
}
}
None
}
pub fn record_success(&self, host: &str) {
let mut guard: MutexGuard<'_, HashMap<String, HostRecord>> = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("HostHealthTracker mutex poisoned, recovering");
poisoned.into_inner()
}
};
guard.remove(host);
}
pub fn record_failure(&self, host: &str) -> Option<Instant> {
let now: Instant = self.clock.now();
let mut guard: MutexGuard<'_, HashMap<String, HostRecord>> = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("HostHealthTracker mutex poisoned, recovering");
poisoned.into_inner()
}
};
let record: &mut HostRecord = guard.entry(host.to_string()).or_default();
if let Some(until) = record.offline_until {
if now < until {
return Some(until);
}
record.offline_until = None;
record.failures.clear();
}
self.prune_old(record, now);
record.failures.push(now);
if record.failures.len() >= self.failure_threshold {
let until: Instant = now + self.offline_duration;
record.offline_until = Some(until);
record.failures.clear();
return Some(until);
}
None
}
pub fn force_offline(&self, host: &str, duration: Duration) -> Instant {
let now: Instant = self.clock.now();
let deadline: Instant = now + duration;
let mut guard: MutexGuard<'_, HashMap<String, HostRecord>> = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("HostHealthTracker mutex poisoned, recovering");
poisoned.into_inner()
}
};
let record: &mut HostRecord = guard.entry(host.to_string()).or_default();
record.failures.clear();
record.offline_until = Some(deadline);
deadline
}
pub fn reset_host(&self, host: &str) {
let mut guard: MutexGuard<'_, HashMap<String, HostRecord>> = match self.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!("HostHealthTracker mutex poisoned, recovering");
poisoned.into_inner()
}
};
guard.remove(host);
}
}
pub fn global_tracker() -> &'static HostHealthTracker<SystemClock> {
static SCYLLA_TRACKER: Lazy<HostHealthTracker<SystemClock>> = Lazy::new(|| {
HostHealthTracker::new(
SystemClock::default(),
5,
Duration::from_secs(60),
Duration::from_secs(300),
)
});
&SCYLLA_TRACKER
}
pub struct HostOffline {
host: String,
until: Instant,
}
impl HostOffline {
pub fn new(host: String, until: Instant) -> Self {
Self { host, until }
}
pub fn host(&self) -> &str {
&self.host
}
pub fn until(&self) -> Instant {
self.until
}
}
impl Display for HostOffline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Host {} is offline for another {:?}",
self.host,
self.until
.checked_duration_since(Instant::now())
.unwrap_or_else(|| Duration::from_secs(0))
)
}
}
impl fmt::Debug for HostOffline {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{{ host: {}, remaining: {:?} }}",
self.host,
self.until
.checked_duration_since(Instant::now())
.unwrap_or_else(|| Duration::from_secs(0))
)
}
}
impl Error for HostOffline {}