use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubgraphHealthStatus {
pub name: String,
pub available: bool,
pub latency_ms: f64,
pub last_check: String,
pub error_count_last_60s: u32,
pub error_rate_percent: f64,
}
#[derive(Debug)]
pub struct RollingErrorWindow {
buckets: Mutex<VecDeque<ErrorBucket>>,
}
#[derive(Debug, Clone)]
struct ErrorBucket {
timestamp: Instant,
errors: u32,
total: u32,
}
impl RollingErrorWindow {
pub fn new() -> Self {
Self {
buckets: Mutex::new(VecDeque::with_capacity(6)),
}
}
pub fn record_success(&self) {
let mut buckets = self.buckets.lock().unwrap();
if let Some(bucket) = buckets.back_mut() {
bucket.total += 1;
} else {
buckets.push_back(ErrorBucket {
timestamp: Instant::now(),
errors: 0,
total: 1,
});
}
}
pub fn record_error(&self) {
let mut buckets = self.buckets.lock().unwrap();
if let Some(bucket) = buckets.back_mut() {
bucket.errors += 1;
bucket.total += 1;
} else {
buckets.push_back(ErrorBucket {
timestamp: Instant::now(),
errors: 1,
total: 1,
});
}
}
pub fn error_count(&self) -> u32 {
let buckets = self.buckets.lock().unwrap();
let now = Instant::now();
buckets
.iter()
.filter(|b| now.duration_since(b.timestamp) < Duration::from_secs(60))
.map(|b| b.errors)
.sum()
}
pub fn error_rate_percent(&self) -> f64 {
let buckets = self.buckets.lock().unwrap();
let now = Instant::now();
let recent: Vec<_> = buckets
.iter()
.filter(|b| now.duration_since(b.timestamp) < Duration::from_secs(300))
.collect();
if recent.is_empty() {
return 0.0;
}
let total_errors: u32 = recent.iter().map(|b| b.errors).sum();
let total_checks: u32 = recent.iter().map(|b| b.total).sum();
if total_checks == 0 {
0.0
} else {
(total_errors as f64 / total_checks as f64) * 100.0
}
}
fn cleanup(&self) {
let mut buckets = self.buckets.lock().unwrap();
let now = Instant::now();
while let Some(front) = buckets.front() {
if now.duration_since(front.timestamp) > Duration::from_secs(300) {
buckets.pop_front();
} else {
break;
}
}
}
}
impl Default for RollingErrorWindow {
fn default() -> Self {
Self::new()
}
}
pub struct SubgraphHealthChecker {
subgraphs: Vec<SubgraphConfig>,
http_client: reqwest::Client,
error_windows: Arc<Mutex<std::collections::HashMap<String, RollingErrorWindow>>>,
status_cache: Arc<Mutex<Vec<SubgraphHealthStatus>>>,
}
#[derive(Debug, Clone)]
pub struct SubgraphConfig {
pub name: String,
pub endpoint: String,
}
impl SubgraphHealthChecker {
pub fn new(subgraphs: Vec<SubgraphConfig>) -> Self {
let mut error_windows = std::collections::HashMap::new();
for config in &subgraphs {
error_windows.insert(config.name.clone(), RollingErrorWindow::new());
}
Self {
subgraphs,
http_client: reqwest::Client::new(),
error_windows: Arc::new(Mutex::new(error_windows)),
status_cache: Arc::new(Mutex::new(Vec::new())),
}
}
async fn check_subgraph(&self, config: &SubgraphConfig) -> SubgraphHealthStatus {
let start = Instant::now();
let query = serde_json::json!({
"query": "{ __typename }"
});
let result = self
.http_client
.post(&config.endpoint)
.header("Content-Type", "application/json")
.json(&query)
.timeout(Duration::from_secs(2))
.send()
.await;
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
let available = matches!(result, Ok(ref resp) if resp.status() == 200);
let (error_count, error_rate) = {
let windows = self.error_windows.lock().unwrap();
let window = windows.get(&config.name).expect("Window should exist for subgraph");
if available {
window.record_success();
} else {
if let Ok(resp) = &result {
warn!(
subgraph = %config.name,
status = %resp.status(),
latency_ms = latency_ms,
"Subgraph returned non-200 status"
);
} else if let Err(e) = &result {
warn!(
subgraph = %config.name,
error = %e,
latency_ms = latency_ms,
"Subgraph health check failed"
);
}
window.record_error();
}
(window.error_count(), window.error_rate_percent())
};
SubgraphHealthStatus {
name: config.name.clone(),
available,
latency_ms,
last_check: Utc::now().to_rfc3339(),
error_count_last_60s: error_count,
error_rate_percent: error_rate,
}
}
pub async fn run_background_checks(self: Arc<Self>) {
info!("Starting federation health check background task");
loop {
let mut statuses = Vec::new();
for config in &self.subgraphs {
let status = self.check_subgraph(config).await;
debug!(
subgraph = %status.name,
available = status.available,
latency_ms = status.latency_ms,
"Subgraph health check result"
);
statuses.push(status);
}
{
let mut cache = self.status_cache.lock().unwrap();
*cache = statuses;
}
{
let windows = self.error_windows.lock().unwrap();
for window in windows.values() {
window.cleanup();
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
pub fn get_cached_statuses(&self) -> Vec<SubgraphHealthStatus> {
self.status_cache.lock().unwrap().iter().cloned().collect()
}
pub fn get_overall_status(&self) -> String {
let statuses = self.get_cached_statuses();
if statuses.is_empty() {
return "unknown".to_string();
}
if statuses.iter().all(|s| s.available) {
"healthy".to_string()
} else if statuses.iter().any(|s| s.available) {
"degraded".to_string()
} else {
"unhealthy".to_string()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rolling_error_window_creation() {
let window = RollingErrorWindow::new();
assert_eq!(window.error_count(), 0);
assert_eq!(window.error_rate_percent(), 0.0);
}
#[test]
fn test_rolling_error_window_success() {
let window = RollingErrorWindow::new();
window.record_success();
window.record_success();
assert_eq!(window.error_count(), 0);
assert_eq!(window.error_rate_percent(), 0.0);
}
#[test]
fn test_rolling_error_window_mixed() {
let window = RollingErrorWindow::new();
window.record_success();
window.record_success();
window.record_error();
assert_eq!(window.error_count(), 1);
assert!((window.error_rate_percent() - 33.33).abs() < 0.1);
}
#[test]
fn test_health_status_serialization() {
let status = SubgraphHealthStatus {
name: "test-subgraph".to_string(),
available: true,
latency_ms: 25.5,
last_check: Utc::now().to_rfc3339(),
error_count_last_60s: 0,
error_rate_percent: 0.0,
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("test-subgraph"));
assert!(json.contains("true"));
}
}