use crate::errors::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::warn;
pub mod alerts;
pub mod collectors;
pub mod exporters;
pub mod health;
pub use collectors::{
AUTH_FAILED_REQUESTS, AUTH_SUCCESSFUL_REQUESTS, AUTH_TOTAL_REQUESTS, SESSION_ACTIVE_COUNT,
SESSION_CREATED_TOTAL, SESSION_EXPIRED_COUNT, TOKEN_CREATION_COUNT, TOKEN_EXPIRATION_COUNT,
TOKEN_VALIDATION_COUNT,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringConfig {
pub enabled: bool,
pub collection_interval: u64,
pub max_history_size: usize,
pub enable_performance_metrics: bool,
pub enable_security_metrics: bool,
pub enable_health_checks: bool,
pub external_endpoints: Vec<String>,
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enabled: true,
collection_interval: 60, max_history_size: 1000,
enable_performance_metrics: true,
enable_security_metrics: true,
enable_health_checks: true,
external_endpoints: vec![],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricDataPoint {
pub name: String,
pub value: f64,
pub timestamp: u64,
pub labels: HashMap<String, String>,
pub metric_type: MetricType,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
Summary,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityEvent {
pub event_type: SecurityEventType,
pub user_id: Option<String>,
pub ip_address: Option<String>,
pub details: HashMap<String, String>,
pub severity: SecurityEventSeverity,
pub timestamp: u64,
}
impl SecurityEvent {
pub fn builder(
event_type: SecurityEventType,
severity: SecurityEventSeverity,
) -> SecurityEventBuilder {
SecurityEventBuilder {
event_type,
severity,
user_id: None,
ip_address: None,
details: HashMap::new(),
}
}
}
pub struct SecurityEventBuilder {
event_type: SecurityEventType,
severity: SecurityEventSeverity,
user_id: Option<String>,
ip_address: Option<String>,
details: HashMap<String, String>,
}
impl SecurityEventBuilder {
pub fn user(mut self, user_id: impl Into<String>) -> Self {
self.user_id = Some(user_id.into());
self
}
pub fn ip(mut self, ip_address: impl Into<String>) -> Self {
self.ip_address = Some(ip_address.into());
self
}
pub fn detail(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.details.insert(key.into(), value.into());
self
}
pub fn details(
mut self,
iter: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
for (k, v) in iter {
self.details.insert(k.into(), v.into());
}
self
}
pub fn build(self) -> SecurityEvent {
SecurityEvent {
event_type: self.event_type,
user_id: self.user_id,
ip_address: self.ip_address,
details: self.details,
severity: self.severity,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SecurityEventType {
FailedLogin,
AccountLockout,
PrivilegeEscalation,
UnusualActivity,
TokenManipulation,
ConfigurationChange,
SystemError,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub enum SecurityEventSeverity {
Low = 1,
Medium = 2,
High = 3,
Critical = 4,
}
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub auth_requests: Arc<AtomicU64>,
pub auth_successes: Arc<AtomicU64>,
pub auth_failures: Arc<AtomicU64>,
pub token_creations: Arc<AtomicU64>,
pub token_validations: Arc<AtomicU64>,
pub active_sessions: Arc<AtomicU64>,
pub mfa_challenges: Arc<AtomicU64>,
pub avg_response_time: Arc<AtomicU64>,
}
impl Default for PerformanceMetrics {
fn default() -> Self {
Self {
auth_requests: Arc::new(AtomicU64::new(0)),
auth_successes: Arc::new(AtomicU64::new(0)),
auth_failures: Arc::new(AtomicU64::new(0)),
token_creations: Arc::new(AtomicU64::new(0)),
token_validations: Arc::new(AtomicU64::new(0)),
active_sessions: Arc::new(AtomicU64::new(0)),
mfa_challenges: Arc::new(AtomicU64::new(0)),
avg_response_time: Arc::new(AtomicU64::new(0)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckResult {
pub component: String,
pub status: HealthStatus,
pub message: String,
pub timestamp: u64,
pub response_time: u64,
}
pub struct MonitoringManager {
config: MonitoringConfig,
performance: PerformanceMetrics,
metrics_history: Arc<RwLock<Vec<MetricDataPoint>>>,
security_events: Arc<RwLock<Vec<SecurityEvent>>>,
health_results: Arc<RwLock<HashMap<String, HealthCheckResult>>>,
}
impl MonitoringManager {
pub fn new(config: MonitoringConfig) -> Self {
Self {
config,
performance: PerformanceMetrics::default(),
metrics_history: Arc::new(RwLock::new(Vec::new())),
security_events: Arc::new(RwLock::new(Vec::new())),
health_results: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn record_auth_request(&self) {
self.performance
.auth_requests
.fetch_add(1, Ordering::Relaxed);
if self.config.enable_performance_metrics {
self.record_metric(MetricDataPoint {
name: "auth_requests_total".to_string(),
value: self.performance.auth_requests.load(Ordering::Relaxed) as f64,
timestamp: current_timestamp(),
labels: HashMap::new(),
metric_type: MetricType::Counter,
})
.await;
}
}
pub async fn record_auth_success(&self, user_id: &str, duration: Duration) {
self.performance
.auth_successes
.fetch_add(1, Ordering::Relaxed);
self.update_avg_response_time(duration).await;
if self.config.enable_performance_metrics {
let mut labels = HashMap::new();
labels.insert("result".to_string(), "success".to_string());
labels.insert("user_id".to_string(), user_id.to_string());
self.record_metric(MetricDataPoint {
name: "auth_attempts_total".to_string(),
value: 1.0,
timestamp: current_timestamp(),
labels,
metric_type: MetricType::Counter,
})
.await;
}
}
pub async fn record_auth_failure(&self, user_id: Option<&str>, reason: &str) {
self.performance
.auth_failures
.fetch_add(1, Ordering::Relaxed);
if self.config.enable_security_metrics {
let mut details = HashMap::new();
details.insert("reason".to_string(), reason.to_string());
if let Some(user) = user_id {
details.insert("user_id".to_string(), user.to_string());
}
let security_event = SecurityEvent {
event_type: SecurityEventType::FailedLogin,
user_id: user_id.map(|s| s.to_string()),
ip_address: None, details,
severity: SecurityEventSeverity::Medium,
timestamp: current_timestamp(),
};
self.record_security_event(security_event).await;
}
if self.config.enable_performance_metrics {
let mut labels = HashMap::new();
labels.insert("result".to_string(), "failure".to_string());
labels.insert("reason".to_string(), reason.to_string());
self.record_metric(MetricDataPoint {
name: "auth_attempts_total".to_string(),
value: 1.0,
timestamp: current_timestamp(),
labels,
metric_type: MetricType::Counter,
})
.await;
}
}
pub async fn record_token_creation(&self, token_type: &str) {
self.performance
.token_creations
.fetch_add(1, Ordering::Relaxed);
if self.config.enable_performance_metrics {
let mut labels = HashMap::new();
labels.insert("token_type".to_string(), token_type.to_string());
self.record_metric(MetricDataPoint {
name: "tokens_created_total".to_string(),
value: 1.0,
timestamp: current_timestamp(),
labels,
metric_type: MetricType::Counter,
})
.await;
}
}
pub async fn record_token_validation(&self, valid: bool) {
self.performance
.token_validations
.fetch_add(1, Ordering::Relaxed);
if self.config.enable_performance_metrics {
let mut labels = HashMap::new();
labels.insert(
"result".to_string(),
if valid { "valid" } else { "invalid" }.to_string(),
);
self.record_metric(MetricDataPoint {
name: "tokens_validated_total".to_string(),
value: 1.0,
timestamp: current_timestamp(),
labels,
metric_type: MetricType::Counter,
})
.await;
}
}
pub async fn update_session_count(&self, count: u64) {
self.performance
.active_sessions
.store(count, Ordering::Relaxed);
if self.config.enable_performance_metrics {
self.record_metric(MetricDataPoint {
name: "active_sessions".to_string(),
value: count as f64,
timestamp: current_timestamp(),
labels: HashMap::new(),
metric_type: MetricType::Gauge,
})
.await;
}
}
pub async fn record_mfa_challenge(&self, method: &str) {
self.performance
.mfa_challenges
.fetch_add(1, Ordering::Relaxed);
if self.config.enable_performance_metrics {
let mut labels = HashMap::new();
labels.insert("method".to_string(), method.to_string());
self.record_metric(MetricDataPoint {
name: "mfa_challenges_total".to_string(),
value: 1.0,
timestamp: current_timestamp(),
labels,
metric_type: MetricType::Counter,
})
.await;
}
}
pub async fn record_security_event(&self, event: SecurityEvent) {
if !self.config.enable_security_metrics {
return;
}
let mut events = self.security_events.write().await;
events.push(event.clone());
if events.len() > self.config.max_history_size {
events.remove(0);
}
tracing::warn!(
"Security event: {:?} - User: {:?}, Severity: {:?}",
event.event_type,
event.user_id,
event.severity
);
if event.severity == SecurityEventSeverity::Critical {
tracing::error!("CRITICAL security event: {:?}", event);
}
}
async fn record_metric(&self, metric: MetricDataPoint) {
if !self.config.enabled {
return;
}
let mut metrics = self.metrics_history.write().await;
metrics.push(metric);
if metrics.len() > self.config.max_history_size {
metrics.remove(0);
}
}
async fn update_avg_response_time(&self, duration: Duration) {
let current_avg = self.performance.avg_response_time.load(Ordering::Relaxed);
let new_time = duration.as_micros() as u64;
let updated_avg = if current_avg == 0 {
new_time
} else {
(current_avg + new_time) / 2
};
self.performance
.avg_response_time
.store(updated_avg, Ordering::Relaxed);
}
pub fn get_performance_metrics(&self) -> HashMap<String, u64> {
let mut metrics = HashMap::new();
metrics.insert(
"auth_requests".to_string(),
self.performance.auth_requests.load(Ordering::Relaxed),
);
metrics.insert(
"auth_successes".to_string(),
self.performance.auth_successes.load(Ordering::Relaxed),
);
metrics.insert(
"auth_failures".to_string(),
self.performance.auth_failures.load(Ordering::Relaxed),
);
metrics.insert(
"token_creations".to_string(),
self.performance.token_creations.load(Ordering::Relaxed),
);
metrics.insert(
"token_validations".to_string(),
self.performance.token_validations.load(Ordering::Relaxed),
);
metrics.insert(
"active_sessions".to_string(),
self.performance.active_sessions.load(Ordering::Relaxed),
);
metrics.insert(
"mfa_challenges".to_string(),
self.performance.mfa_challenges.load(Ordering::Relaxed),
);
metrics.insert(
"avg_response_time_us".to_string(),
self.performance.avg_response_time.load(Ordering::Relaxed),
);
metrics
}
pub async fn get_security_events(&self, limit: Option<usize>) -> Vec<SecurityEvent> {
let events = self.security_events.read().await;
let limit = limit.unwrap_or(100);
if events.len() <= limit {
events.clone()
} else {
events.iter().rev().take(limit).cloned().collect()
}
}
pub async fn get_metrics_history(&self, metric_name: Option<&str>) -> Vec<MetricDataPoint> {
let metrics = self.metrics_history.read().await;
if let Some(name) = metric_name {
metrics.iter().filter(|m| m.name == name).cloned().collect()
} else {
metrics.clone()
}
}
pub async fn health_check(&self) -> Result<HashMap<String, HealthCheckResult>> {
if !self.config.enable_health_checks {
let mut results = HashMap::new();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
results.insert(
"monitoring".to_string(),
HealthCheckResult {
component: "monitoring".to_string(),
status: HealthStatus::Healthy,
message: "Health checks disabled; monitoring subsystem not active"
.to_string(),
timestamp,
response_time: 0,
},
);
return Ok(results);
}
let mut results = HashMap::new();
let start_time = SystemTime::now();
let auth_health = self.check_auth_health().await;
results.insert("authentication".to_string(), auth_health);
let storage_health = self.check_storage_health().await;
results.insert("storage".to_string(), storage_health);
let token_health = self.check_token_health().await;
results.insert("tokens".to_string(), token_health);
let mut health_cache = self.health_results.write().await;
for (component, result) in &results {
health_cache.insert(component.clone(), result.clone());
}
let elapsed = start_time.elapsed().unwrap_or_default();
tracing::debug!("Health check completed in {:?}", elapsed);
Ok(results)
}
async fn check_auth_health(&self) -> HealthCheckResult {
let start_time = SystemTime::now();
let auth_requests = self.performance.auth_requests.load(Ordering::Relaxed);
let auth_failures = self.performance.auth_failures.load(Ordering::Relaxed);
let status = if auth_requests > 0 {
let failure_rate = (auth_failures as f64) / (auth_requests as f64);
if failure_rate > 0.5 {
HealthStatus::Unhealthy
} else if failure_rate > 0.2 {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
}
} else {
HealthStatus::Healthy
};
let message = match status {
HealthStatus::Healthy => "Authentication system operating normally".to_string(),
HealthStatus::Degraded => format!(
"High failure rate: {:.1}%",
(auth_failures as f64 / auth_requests as f64) * 100.0
),
HealthStatus::Unhealthy => format!(
"Critical failure rate: {:.1}%",
(auth_failures as f64 / auth_requests as f64) * 100.0
),
HealthStatus::Critical => "Authentication system down".to_string(),
};
HealthCheckResult {
component: "authentication".to_string(),
status,
message,
timestamp: current_timestamp(),
response_time: start_time.elapsed().unwrap_or_default().as_millis() as u64,
}
}
async fn check_storage_health(&self) -> HealthCheckResult {
let start_time = SystemTime::now();
let status = match self.test_storage_connectivity().await {
Ok(response_time_ms) => {
if response_time_ms > 5000 {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
}
}
Err(e) => {
warn!("Storage health check failed: {}", e);
HealthStatus::Critical
}
};
let message = match status {
HealthStatus::Healthy => "Storage system operational".to_string(),
HealthStatus::Degraded => "Storage system slow but operational".to_string(),
HealthStatus::Critical => "Storage system connectivity failed".to_string(),
HealthStatus::Unhealthy => "Storage system unhealthy".to_string(),
};
HealthCheckResult {
component: "storage".to_string(),
status,
message,
timestamp: current_timestamp(),
response_time: start_time.elapsed().unwrap_or_default().as_millis() as u64,
}
}
async fn test_storage_connectivity(
&self,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let start_time = SystemTime::now();
match tokio::time::timeout(
std::time::Duration::from_secs(5),
self.attempt_storage_ping(),
)
.await
{
Ok(result) => {
result?;
let response_time = start_time.elapsed()?.as_millis() as u64;
Ok(response_time)
}
Err(_) => Err("Storage connectivity test timed out".into()),
}
}
async fn attempt_storage_ping(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let _history = self.metrics_history.read().await;
let _health = self.health_results.read().await;
Ok(())
}
async fn check_token_health(&self) -> HealthCheckResult {
let start_time = SystemTime::now();
let token_validations = self.performance.token_validations.load(Ordering::Relaxed);
HealthCheckResult {
component: "tokens".to_string(),
status: HealthStatus::Healthy,
message: format!(
"Token system operational - {} validations",
token_validations
),
timestamp: current_timestamp(),
response_time: start_time.elapsed().unwrap_or_default().as_millis() as u64,
}
}
pub async fn export_prometheus_metrics(&self) -> String {
let mut output = String::new();
let metrics = self.get_performance_metrics();
for (name, value) in metrics {
output.push_str(&format!(
"# HELP auth_{} Authentication framework metric\n",
name
));
output.push_str(&format!("# TYPE auth_{} counter\n", name));
output.push_str(&format!("auth_{} {}\n", name, value));
}
output
}
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
#[cfg(test)]
mod tests {
use super::*;
use tokio;
#[tokio::test]
async fn test_monitoring_manager_creation() {
let config = MonitoringConfig::default();
let manager = MonitoringManager::new(config);
let metrics = manager.get_performance_metrics();
assert_eq!(metrics["auth_requests"], 0);
}
#[tokio::test]
async fn test_auth_request_recording() {
let config = MonitoringConfig::default();
let manager = MonitoringManager::new(config);
manager.record_auth_request().await;
manager.record_auth_request().await;
let metrics = manager.get_performance_metrics();
assert_eq!(metrics["auth_requests"], 2);
}
#[tokio::test]
async fn test_security_event_recording() {
let config = MonitoringConfig::default();
let manager = MonitoringManager::new(config);
let event = SecurityEvent {
event_type: SecurityEventType::FailedLogin,
user_id: Some("test_user".to_string()),
ip_address: Some("127.0.0.1".to_string()),
details: HashMap::new(),
severity: SecurityEventSeverity::Medium,
timestamp: current_timestamp(),
};
manager.record_security_event(event).await;
let events = manager.get_security_events(None).await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, SecurityEventType::FailedLogin);
}
#[tokio::test]
async fn test_security_event_builder() {
let event = SecurityEvent::builder(
SecurityEventType::AccountLockout,
SecurityEventSeverity::High,
)
.user("user-42")
.ip("10.0.0.1")
.detail("reason", "too many failures")
.detail("attempts", "5")
.build();
assert_eq!(event.event_type, SecurityEventType::AccountLockout);
assert_eq!(event.severity, SecurityEventSeverity::High);
assert_eq!(event.user_id.as_deref(), Some("user-42"));
assert_eq!(event.ip_address.as_deref(), Some("10.0.0.1"));
assert_eq!(event.details.len(), 2);
assert_eq!(event.details["reason"], "too many failures");
assert!(event.timestamp > 0);
}
#[tokio::test]
async fn test_security_event_builder_minimal() {
let event = SecurityEvent::builder(
SecurityEventType::SystemError,
SecurityEventSeverity::Low,
)
.build();
assert_eq!(event.event_type, SecurityEventType::SystemError);
assert!(event.user_id.is_none());
assert!(event.ip_address.is_none());
assert!(event.details.is_empty());
}
#[tokio::test]
async fn test_health_check() {
let config = MonitoringConfig::default();
let manager = MonitoringManager::new(config);
let health_results = manager.health_check().await.unwrap();
assert!(health_results.contains_key("authentication"));
assert!(health_results.contains_key("storage"));
assert!(health_results.contains_key("tokens"));
}
#[tokio::test]
async fn test_prometheus_export() {
let config = MonitoringConfig::default();
let manager = MonitoringManager::new(config);
manager.record_auth_request().await;
let prometheus_output = manager.export_prometheus_metrics().await;
assert!(prometheus_output.contains("auth_auth_requests"));
assert!(prometheus_output.contains("# HELP"));
assert!(prometheus_output.contains("# TYPE"));
}
}