use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum MonitoringError {
#[error("Metric collection error: {0}")]
MetricCollection(String),
#[error("Exporter error: {0}")]
Exporter(String),
#[error("Alert manager error: {0}")]
AlertManager(String),
#[error("Dashboard error: {0}")]
Dashboard(String),
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Network error: {0}")]
Network(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
Summary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricValue {
pub value: f64,
pub timestamp: u64,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metric {
pub name: String,
pub metric_type: MetricType,
pub description: String,
pub unit: String,
pub values: Vec<MetricValue>,
pub retention: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertRule {
pub name: String,
pub metric_name: String,
pub condition: AlertCondition,
pub threshold: f64,
pub duration: Duration,
pub severity: AlertSeverity,
pub enabled: bool,
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertCondition {
GreaterThan,
LessThan,
Equal,
NotEqual,
GreaterThanOrEqual,
LessThanOrEqual,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertSeverity {
Critical,
Warning,
Info,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub rule_name: String,
pub metric_name: String,
pub current_value: f64,
pub threshold: f64,
pub severity: AlertSeverity,
pub message: String,
pub started_at: u64,
pub resolved_at: Option<u64>,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dashboard {
pub id: String,
pub title: String,
pub description: String,
pub panels: Vec<DashboardPanel>,
pub refresh_interval: Duration,
pub time_range: TimeRange,
pub variables: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardPanel {
pub id: String,
pub title: String,
pub panel_type: PanelType,
pub metric_queries: Vec<String>,
pub position: PanelPosition,
pub options: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PanelType {
LineGraph,
BarChart,
Gauge,
SingleStat,
Table,
HeatMap,
Alert,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PanelPosition {
pub x: u32,
pub y: u32,
pub width: u32,
pub height: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeRange {
pub from: String,
pub to: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringConfig {
pub enabled: bool,
pub prometheus: PrometheusConfig,
pub grafana: GrafanaConfig,
pub alertmanager: AlertManagerConfig,
pub custom_exporters: Vec<ExporterConfig>,
pub dashboards: Vec<Dashboard>,
pub retention_policy: RetentionPolicy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrometheusConfig {
pub endpoint: String,
pub scrape_interval: Duration,
pub evaluation_interval: Duration,
pub external_labels: HashMap<String, String>,
pub rule_files: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrafanaConfig {
pub endpoint: String,
pub api_key: String,
pub organization: String,
pub datasource: String,
pub auto_provision: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManagerConfig {
pub endpoint: String,
pub receivers: Vec<AlertReceiver>,
pub routing: AlertRouting,
pub inhibit_rules: Vec<InhibitRule>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertReceiver {
pub name: String,
pub webhook_configs: Vec<WebhookConfig>,
pub email_configs: Vec<EmailConfig>,
pub slack_configs: Vec<SlackConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: String,
pub http_config: Option<HttpConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailConfig {
pub to: Vec<String>,
pub from: String,
pub subject: String,
pub body: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlackConfig {
pub api_url: String,
pub channel: String,
pub username: String,
pub title: String,
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
pub basic_auth: Option<BasicAuth>,
pub bearer_token: Option<String>,
pub tls_config: Option<TlsConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BasicAuth {
pub username: String,
pub password: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
pub ca_file: Option<String>,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub insecure_skip_verify: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertRouting {
pub group_by: Vec<String>,
pub group_wait: Duration,
pub group_interval: Duration,
pub repeat_interval: Duration,
pub receiver: String,
pub routes: Vec<Route>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Route {
pub matchers: HashMap<String, String>,
pub receiver: String,
pub group_by: Vec<String>,
pub continue_route: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InhibitRule {
pub source_matchers: HashMap<String, String>,
pub target_matchers: HashMap<String, String>,
pub equal: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExporterConfig {
pub name: String,
pub endpoint: String,
pub interval: Duration,
pub timeout: Duration,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicy {
pub default_retention: Duration,
pub metric_retentions: HashMap<String, Duration>,
pub downsampling_rules: Vec<DownsamplingRule>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownsamplingRule {
pub resolution: Duration,
pub retention: Duration,
pub aggregation: AggregationType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AggregationType {
Average,
Sum,
Min,
Max,
Count,
}
pub struct MonitoringSystem {
config: MonitoringConfig,
metrics: HashMap<String, Metric>,
alert_rules: Vec<AlertRule>,
active_alerts: Vec<Alert>,
dashboards: Vec<Dashboard>,
}
impl MonitoringSystem {
pub fn new(config: MonitoringConfig) -> Self {
Self {
config,
metrics: HashMap::new(),
alert_rules: Vec::new(),
active_alerts: Vec::new(),
dashboards: Vec::new(),
}
}
pub async fn initialize(&mut self) -> Result<(), MonitoringError> {
if !self.config.enabled {
return Ok(());
}
self.initialize_prometheus().await?;
self.initialize_grafana().await?;
self.initialize_alertmanager().await?;
self.setup_default_metrics().await?;
self.load_dashboards().await?;
Ok(())
}
async fn initialize_prometheus(&self) -> Result<(), MonitoringError> {
tracing::info!(
endpoint = %self.config.prometheus.endpoint,
"Initializing Prometheus"
);
Ok(())
}
async fn initialize_grafana(&self) -> Result<(), MonitoringError> {
if self.config.grafana.auto_provision {
self.provision_grafana_dashboards().await?;
}
Ok(())
}
async fn initialize_alertmanager(&self) -> Result<(), MonitoringError> {
tracing::info!(
endpoint = %self.config.alertmanager.endpoint,
"Initializing AlertManager"
);
Ok(())
}
async fn setup_default_metrics(&mut self) -> Result<(), MonitoringError> {
self.register_metric(Metric {
name: "auth_requests_total".to_string(),
metric_type: MetricType::Counter,
description: "Total number of authentication requests".to_string(),
unit: "requests".to_string(),
values: Vec::new(),
retention: Duration::from_secs(30 * 24 * 3600), });
self.register_metric(Metric {
name: "auth_success_total".to_string(),
metric_type: MetricType::Counter,
description: "Total number of successful authentications".to_string(),
unit: "requests".to_string(),
values: Vec::new(),
retention: Duration::from_secs(30 * 24 * 3600),
});
self.register_metric(Metric {
name: "auth_failures_total".to_string(),
metric_type: MetricType::Counter,
description: "Total number of failed authentications".to_string(),
unit: "requests".to_string(),
values: Vec::new(),
retention: Duration::from_secs(30 * 24 * 3600),
});
self.register_metric(Metric {
name: "authz_checks_total".to_string(),
metric_type: MetricType::Counter,
description: "Total number of authorization checks".to_string(),
unit: "checks".to_string(),
values: Vec::new(),
retention: Duration::from_secs(30 * 24 * 3600),
});
self.register_metric(Metric {
name: "authz_denied_total".to_string(),
metric_type: MetricType::Counter,
description: "Total number of denied authorization checks".to_string(),
unit: "checks".to_string(),
values: Vec::new(),
retention: Duration::from_secs(30 * 24 * 3600),
});
self.register_metric(Metric {
name: "request_duration_seconds".to_string(),
metric_type: MetricType::Histogram,
description: "Request duration in seconds".to_string(),
unit: "seconds".to_string(),
values: Vec::new(),
retention: Duration::from_secs(7 * 24 * 3600), });
self.register_metric(Metric {
name: "active_sessions".to_string(),
metric_type: MetricType::Gauge,
description: "Number of active user sessions".to_string(),
unit: "sessions".to_string(),
values: Vec::new(),
retention: Duration::from_secs(24 * 3600), });
Ok(())
}
pub fn register_metric(&mut self, metric: Metric) {
self.metrics.insert(metric.name.clone(), metric);
}
pub fn record_metric(
&mut self,
name: &str,
value: f64,
labels: HashMap<String, String>,
) -> Result<(), MonitoringError> {
if let Some(metric) = self.metrics.get_mut(name) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
let metric_value = MetricValue {
value,
timestamp: now.as_secs(),
labels,
};
metric.values.push(metric_value);
let cutoff = now.as_secs() - metric.retention.as_secs();
metric.values.retain(|v| v.timestamp > cutoff);
Ok(())
} else {
Err(MonitoringError::MetricCollection(format!(
"Metric not found: {}",
name
)))
}
}
pub fn add_alert_rule(&mut self, rule: AlertRule) {
self.alert_rules.push(rule);
}
pub async fn evaluate_alerts(&mut self) -> Result<(), MonitoringError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
for rule in &self.alert_rules {
if !rule.enabled {
continue;
}
if let Some(metric) = self.metrics.get(&rule.metric_name)
&& let Some(latest_value) = metric.values.last()
{
let should_alert = match rule.condition {
AlertCondition::GreaterThan => latest_value.value > rule.threshold,
AlertCondition::LessThan => latest_value.value < rule.threshold,
AlertCondition::Equal => {
(latest_value.value - rule.threshold).abs() < f64::EPSILON
}
AlertCondition::NotEqual => {
(latest_value.value - rule.threshold).abs() > f64::EPSILON
}
AlertCondition::GreaterThanOrEqual => latest_value.value >= rule.threshold,
AlertCondition::LessThanOrEqual => latest_value.value <= rule.threshold,
};
if should_alert {
let existing_alert = self
.active_alerts
.iter()
.find(|alert| alert.rule_name == rule.name && alert.resolved_at.is_none());
if existing_alert.is_none() {
let alert = Alert {
rule_name: rule.name.clone(),
metric_name: rule.metric_name.clone(),
current_value: latest_value.value,
threshold: rule.threshold,
severity: rule.severity.clone(),
message: format!(
"Alert: {} - Current value: {}, Threshold: {}",
rule.name, latest_value.value, rule.threshold
),
started_at: now.as_secs(),
resolved_at: None,
labels: rule.labels.clone(),
};
self.active_alerts.push(alert.clone());
self.send_alert(&alert).await?;
}
} else {
let mut alerts_to_resolve = Vec::new();
for alert in &mut self.active_alerts {
if alert.rule_name == rule.name && alert.resolved_at.is_none() {
alert.resolved_at = Some(now.as_secs());
alerts_to_resolve.push(alert.clone());
}
}
for alert in &alerts_to_resolve {
self.send_alert_resolution(alert).await?;
}
}
}
}
Ok(())
}
async fn send_alert(&self, alert: &Alert) -> Result<(), MonitoringError> {
tracing::warn!(rule = %alert.rule_name, message = %alert.message, "ALERT triggered");
for receiver in &self.config.alertmanager.receivers {
self.send_to_receiver(receiver, alert).await?;
}
Ok(())
}
async fn send_alert_resolution(&self, alert: &Alert) -> Result<(), MonitoringError> {
tracing::info!(rule = %alert.rule_name, message = %alert.message, "ALERT resolved");
Ok(())
}
async fn send_to_receiver(
&self,
receiver: &AlertReceiver,
alert: &Alert,
) -> Result<(), MonitoringError> {
for webhook in &receiver.webhook_configs {
self.send_webhook_alert(webhook, alert).await?;
}
for email in &receiver.email_configs {
self.send_email_alert(email, alert).await?;
}
for slack in &receiver.slack_configs {
self.send_slack_alert(slack, alert).await?;
}
Ok(())
}
async fn send_webhook_alert(
&self,
_webhook: &WebhookConfig,
_alert: &Alert,
) -> Result<(), MonitoringError> {
Ok(())
}
async fn send_email_alert(
&self,
_email: &EmailConfig,
_alert: &Alert,
) -> Result<(), MonitoringError> {
Ok(())
}
async fn send_slack_alert(
&self,
_slack: &SlackConfig,
_alert: &Alert,
) -> Result<(), MonitoringError> {
Ok(())
}
async fn load_dashboards(&mut self) -> Result<(), MonitoringError> {
self.dashboards = self.config.dashboards.clone();
Ok(())
}
async fn provision_grafana_dashboards(&self) -> Result<(), MonitoringError> {
for dashboard in &self.config.dashboards {
tracing::info!(title = %dashboard.title, "Provisioning Grafana dashboard");
}
Ok(())
}
pub fn export_prometheus_metrics(&self) -> String {
let mut output = String::new();
for (name, metric) in &self.metrics {
output.push_str(&format!("# HELP {} {}\n", name, metric.description));
output.push_str(&format!(
"# TYPE {} {}\n",
name,
match metric.metric_type {
MetricType::Counter => "counter",
MetricType::Gauge => "gauge",
MetricType::Histogram => "histogram",
MetricType::Summary => "summary",
}
));
for value in &metric.values {
let labels = if value.labels.is_empty() {
String::new()
} else {
let label_pairs: Vec<String> = value
.labels
.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v))
.collect();
format!("{{{}}}", label_pairs.join(","))
};
output.push_str(&format!(
"{}{} {} {}\n",
name,
labels,
value.value,
value.timestamp * 1000
));
}
}
output
}
pub fn get_metrics(&self) -> &HashMap<String, Metric> {
&self.metrics
}
pub fn get_active_alerts(&self) -> &Vec<Alert> {
&self.active_alerts
}
pub fn get_dashboards(&self) -> &Vec<Dashboard> {
&self.dashboards
}
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enabled: true,
prometheus: PrometheusConfig {
endpoint: "http://localhost:9090".to_string(),
scrape_interval: Duration::from_secs(15),
evaluation_interval: Duration::from_secs(15),
external_labels: HashMap::new(),
rule_files: vec!["alerts.yml".to_string()],
},
grafana: GrafanaConfig {
endpoint: "http://localhost:3000".to_string(),
api_key: "".to_string(),
organization: "Main Org.".to_string(),
datasource: "Prometheus".to_string(),
auto_provision: true,
},
alertmanager: AlertManagerConfig {
endpoint: "http://localhost:9093".to_string(),
receivers: vec![AlertReceiver {
name: "default".to_string(),
webhook_configs: Vec::new(),
email_configs: Vec::new(),
slack_configs: Vec::new(),
}],
routing: AlertRouting {
group_by: vec!["alertname".to_string()],
group_wait: Duration::from_secs(10),
group_interval: Duration::from_secs(10),
repeat_interval: Duration::from_secs(3600),
receiver: "default".to_string(),
routes: Vec::new(),
},
inhibit_rules: Vec::new(),
},
custom_exporters: Vec::new(),
dashboards: Vec::new(),
retention_policy: RetentionPolicy {
default_retention: Duration::from_secs(30 * 24 * 3600), metric_retentions: HashMap::new(),
downsampling_rules: Vec::new(),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_monitoring_system_creation() {
let config = MonitoringConfig::default();
let system = MonitoringSystem::new(config);
assert!(system.metrics.is_empty());
assert!(system.alert_rules.is_empty());
assert!(system.active_alerts.is_empty());
}
#[tokio::test]
async fn test_metric_registration() {
let config = MonitoringConfig::default();
let mut system = MonitoringSystem::new(config);
let metric = Metric {
name: "test_metric".to_string(),
metric_type: MetricType::Counter,
description: "Test metric".to_string(),
unit: "count".to_string(),
values: Vec::new(),
retention: Duration::from_secs(3600),
};
system.register_metric(metric);
assert!(system.metrics.contains_key("test_metric"));
}
#[tokio::test]
async fn test_metric_recording() {
let config = MonitoringConfig::default();
let mut system = MonitoringSystem::new(config);
let metric = Metric {
name: "test_counter".to_string(),
metric_type: MetricType::Counter,
description: "Test counter".to_string(),
unit: "count".to_string(),
values: Vec::new(),
retention: Duration::from_secs(3600),
};
system.register_metric(metric);
let mut labels = HashMap::new();
labels.insert("service".to_string(), "auth".to_string());
let result = system.record_metric("test_counter", 1.0, labels);
assert!(result.is_ok());
let metric = system.metrics.get("test_counter").unwrap();
assert_eq!(metric.values.len(), 1);
assert_eq!(metric.values[0].value, 1.0);
}
#[test]
fn test_prometheus_export() {
let config = MonitoringConfig::default();
let mut system = MonitoringSystem::new(config);
let metric = Metric {
name: "test_gauge".to_string(),
metric_type: MetricType::Gauge,
description: "Test gauge".to_string(),
unit: "value".to_string(),
values: vec![MetricValue {
value: 42.0,
timestamp: 1640995200,
labels: HashMap::new(),
}],
retention: Duration::from_secs(3600),
};
system.register_metric(metric);
let export = system.export_prometheus_metrics();
assert!(export.contains("# HELP test_gauge Test gauge"));
assert!(export.contains("# TYPE test_gauge gauge"));
assert!(export.contains("test_gauge 42"));
}
}