use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
use tracing::{debug, warn};
use scirs2_core::metrics::{Counter, Gauge, Histogram};
use crate::StarResult;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
Summary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricDataPoint {
pub name: String,
pub value: f64,
pub timestamp: DateTime<Utc>,
pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub name: String,
pub metric_name: String,
pub threshold: f64,
pub condition: AlertCondition,
pub severity: AlertSeverity,
pub message_template: String,
pub last_triggered: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AlertCondition {
GreaterThan,
LessThan,
Equal,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheck {
pub component: String,
pub status: HealthStatus,
pub last_check: DateTime<Utc>,
pub message: Option<String>,
pub response_time_ms: Option<f64>,
}
pub struct MetricsCollector {
counters: Arc<RwLock<HashMap<String, Arc<Counter>>>>,
gauges: Arc<RwLock<HashMap<String, Arc<Gauge>>>>,
histograms: Arc<RwLock<HashMap<String, Arc<Histogram>>>>,
time_series: Arc<RwLock<HashMap<String, VecDeque<MetricDataPoint>>>>,
max_history: usize,
alerts: Arc<RwLock<Vec<Alert>>>,
health_checks: Arc<RwLock<HashMap<String, HealthCheck>>>,
stats: Arc<RwLock<MonitoringStatistics>>,
}
#[derive(Debug, Clone, Default)]
pub struct MonitoringStatistics {
pub metrics_collected: usize,
pub alerts_triggered: usize,
pub health_checks_performed: usize,
pub failed_health_checks: usize,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
counters: Arc::new(RwLock::new(HashMap::new())),
gauges: Arc::new(RwLock::new(HashMap::new())),
histograms: Arc::new(RwLock::new(HashMap::new())),
time_series: Arc::new(RwLock::new(HashMap::new())),
max_history: 1000, alerts: Arc::new(RwLock::new(Vec::new())),
health_checks: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(MonitoringStatistics::default())),
}
}
fn get_or_create_counter(&self, name: &str) -> Arc<Counter> {
let mut counters = self.counters.write().unwrap_or_else(|e| e.into_inner());
counters
.entry(name.to_string())
.or_insert_with(|| Arc::new(Counter::new(name.to_string())))
.clone()
}
fn get_or_create_gauge(&self, name: &str) -> Arc<Gauge> {
let mut gauges = self.gauges.write().unwrap_or_else(|e| e.into_inner());
gauges
.entry(name.to_string())
.or_insert_with(|| Arc::new(Gauge::new(name.to_string())))
.clone()
}
fn get_or_create_histogram(&self, name: &str) -> Arc<Histogram> {
let mut histograms = self.histograms.write().unwrap_or_else(|e| e.into_inner());
histograms
.entry(name.to_string())
.or_insert_with(|| Arc::new(Histogram::new(name.to_string())))
.clone()
}
pub fn increment_counter(&self, name: &str) -> StarResult<()> {
self.increment_counter_by(name, 1)
}
pub fn increment_counter_by(&self, name: &str, value: u64) -> StarResult<()> {
let counter = self.get_or_create_counter(name);
counter.add(value);
self.record_data_point(name, value as f64, HashMap::new());
Ok(())
}
pub fn set_gauge(&self, name: &str, value: f64) -> StarResult<()> {
let gauge = self.get_or_create_gauge(name);
gauge.set(value);
self.record_data_point(name, value, HashMap::new());
Ok(())
}
pub fn record_histogram(&self, name: &str, value: f64) -> StarResult<()> {
let histogram = self.get_or_create_histogram(name);
histogram.observe(value);
self.record_data_point(name, value, HashMap::new());
Ok(())
}
pub fn record_data_point(&self, name: &str, value: f64, tags: HashMap<String, String>) {
let data_point = MetricDataPoint {
name: name.to_string(),
value,
timestamp: Utc::now(),
tags,
};
let mut time_series = self.time_series.write().unwrap_or_else(|e| e.into_inner());
let series = time_series.entry(name.to_string()).or_default();
series.push_back(data_point);
if series.len() > self.max_history {
series.pop_front();
}
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.metrics_collected += 1;
self.check_alerts_for_metric(name, value);
}
pub fn add_alert(&self, alert: Alert) {
debug!("Added alert: {}", alert.name);
self.alerts
.write()
.unwrap_or_else(|e| e.into_inner())
.push(alert);
}
fn check_alerts_for_metric(&self, metric_name: &str, value: f64) {
let mut alerts = self.alerts.write().unwrap_or_else(|e| e.into_inner());
for alert in alerts.iter_mut() {
if alert.metric_name != metric_name {
continue;
}
let triggered = match alert.condition {
AlertCondition::GreaterThan => value > alert.threshold,
AlertCondition::LessThan => value < alert.threshold,
AlertCondition::Equal => (value - alert.threshold).abs() < f64::EPSILON,
};
if triggered {
let now = Utc::now();
alert.last_triggered = Some(now);
warn!(
"Alert triggered: {} - {} {} {} (current: {})",
alert.name, metric_name, alert.condition, alert.threshold, value
);
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.alerts_triggered += 1;
}
}
}
pub fn register_health_check(
&self,
component: &str,
status: HealthStatus,
message: Option<String>,
) {
let health_check = HealthCheck {
component: component.to_string(),
status,
last_check: Utc::now(),
message,
response_time_ms: None,
};
self.health_checks
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(component.to_string(), health_check);
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.health_checks_performed += 1;
if status != HealthStatus::Healthy {
self.stats
.write()
.unwrap_or_else(|e| e.into_inner())
.failed_health_checks += 1;
}
}
pub fn overall_health(&self) -> HealthStatus {
let health_checks = self.health_checks.read().unwrap_or_else(|e| e.into_inner());
if health_checks.is_empty() {
return HealthStatus::Healthy;
}
let mut has_degraded = false;
for check in health_checks.values() {
match check.status {
HealthStatus::Unhealthy => return HealthStatus::Unhealthy,
HealthStatus::Degraded => has_degraded = true,
HealthStatus::Healthy => {}
}
}
if has_degraded {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
}
}
pub fn get_time_series(&self, name: &str) -> Vec<MetricDataPoint> {
self.time_series
.read()
.unwrap_or_else(|e| e.into_inner())
.get(name)
.map(|series| series.iter().cloned().collect())
.unwrap_or_default()
}
pub fn get_metric_summary(&self, name: &str) -> Option<MetricSummary> {
let series = self.get_time_series(name);
if series.is_empty() {
return None;
}
let values: Vec<f64> = series.iter().map(|dp| dp.value).collect();
let sum: f64 = values.iter().sum();
let count = values.len();
let mean = sum / count as f64;
let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
let mut sorted = values.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p50 = sorted[count / 2];
let p95 = sorted[(count as f64 * 0.95) as usize];
let p99 = sorted[(count as f64 * 0.99) as usize];
Some(MetricSummary {
name: name.to_string(),
count,
sum,
mean,
min,
max,
p50,
p95,
p99,
})
}
pub fn export_prometheus(&self) -> String {
let mut output = String::new();
for (name, series) in self
.time_series
.read()
.unwrap_or_else(|e| e.into_inner())
.iter()
{
if let Some(latest) = series.back() {
output.push_str(&format!("# TYPE {} gauge\n", name));
output.push_str(&format!("{} {}\n", name, latest.value));
}
}
output
}
pub fn statistics(&self) -> MonitoringStatistics {
self.stats.read().unwrap_or_else(|e| e.into_inner()).clone()
}
pub fn clear_time_series(&self) {
self.time_series
.write()
.unwrap_or_else(|e| e.into_inner())
.clear();
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricSummary {
pub name: String,
pub count: usize,
pub sum: f64,
pub mean: f64,
pub min: f64,
pub max: f64,
pub p50: f64,
pub p95: f64,
pub p99: f64,
}
impl std::fmt::Display for AlertCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::GreaterThan => write!(f, ">"),
Self::LessThan => write!(f, "<"),
Self::Equal => write!(f, "="),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_counter() {
let collector = MetricsCollector::new();
collector.increment_counter("test_counter").unwrap();
collector.increment_counter("test_counter").unwrap();
collector.increment_counter_by("test_counter", 5).unwrap();
let series = collector.get_time_series("test_counter");
assert_eq!(series.len(), 3);
}
#[test]
fn test_gauge() {
let collector = MetricsCollector::new();
collector.set_gauge("memory_usage", 1024.0).unwrap();
collector.set_gauge("memory_usage", 2048.0).unwrap();
let series = collector.get_time_series("memory_usage");
assert_eq!(series.len(), 2);
assert_eq!(series.last().unwrap().value, 2048.0);
}
#[test]
fn test_histogram() {
let collector = MetricsCollector::new();
for i in 1..=100 {
collector.record_histogram("latency", i as f64).unwrap();
}
let summary = collector.get_metric_summary("latency").unwrap();
assert_eq!(summary.count, 100);
assert_eq!(summary.min, 1.0);
assert_eq!(summary.max, 100.0);
}
#[test]
fn test_alerts() {
let collector = MetricsCollector::new();
let alert = Alert {
name: "high_latency".to_string(),
metric_name: "query_latency".to_string(),
threshold: 100.0,
condition: AlertCondition::GreaterThan,
severity: AlertSeverity::Warning,
message_template: "Query latency exceeded threshold".to_string(),
last_triggered: None,
};
collector.add_alert(alert);
collector.record_histogram("query_latency", 150.0).unwrap();
let stats = collector.statistics();
assert_eq!(stats.alerts_triggered, 1);
}
#[test]
fn test_health_checks() {
let collector = MetricsCollector::new();
collector.register_health_check("database", HealthStatus::Healthy, None);
collector.register_health_check("cache", HealthStatus::Healthy, None);
assert_eq!(collector.overall_health(), HealthStatus::Healthy);
collector.register_health_check(
"storage",
HealthStatus::Degraded,
Some("High latency".to_string()),
);
assert_eq!(collector.overall_health(), HealthStatus::Degraded);
collector.register_health_check(
"network",
HealthStatus::Unhealthy,
Some("Connection lost".to_string()),
);
assert_eq!(collector.overall_health(), HealthStatus::Unhealthy);
}
#[test]
fn test_metric_summary() {
let collector = MetricsCollector::new();
for i in 1..=100 {
collector.record_histogram("test_metric", i as f64).unwrap();
}
let summary = collector.get_metric_summary("test_metric").unwrap();
assert_eq!(summary.count, 100);
assert_eq!(summary.mean, 50.5);
assert_eq!(summary.min, 1.0);
assert_eq!(summary.max, 100.0);
}
#[test]
fn test_time_series_limit() {
let collector = MetricsCollector::new();
for i in 1..=20 {
collector.set_gauge("limited_metric", i as f64).unwrap();
}
let series = collector.get_time_series("limited_metric");
assert!(series.len() <= 20); }
#[test]
fn test_prometheus_export() {
let collector = MetricsCollector::new();
collector.set_gauge("cpu_usage", 75.5).unwrap();
collector.set_gauge("memory_usage", 2048.0).unwrap();
let prometheus = collector.export_prometheus();
assert!(prometheus.contains("cpu_usage"));
assert!(prometheus.contains("memory_usage"));
assert!(prometheus.contains("75.5"));
assert!(prometheus.contains("2048"));
}
}