use crate::errors::{AuthError, Result};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{Arc, atomic::AtomicU64},
time::{Duration, SystemTime},
};
use tokio::sync::RwLock;
#[cfg(feature = "prometheus")]
use prometheus::{
Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Opts, Registry,
register_gauge_with_registry, register_histogram_with_registry,
register_int_counter_with_registry, register_int_gauge_with_registry,
};
pub struct ObservabilityManager {
#[cfg(feature = "prometheus")]
registry: Registry,
#[cfg(feature = "prometheus")]
metrics: PrometheusMetrics,
performance_metrics: Arc<RwLock<PerformanceMetrics>>,
security_monitor: Arc<SecurityMonitor>,
config: ObservabilityConfig,
}
#[cfg(feature = "prometheus")]
#[derive(Clone)]
pub struct PrometheusMetrics {
pub auth_requests_total: IntCounter,
pub auth_success_total: IntCounter,
pub auth_failures_total: IntCounter,
pub auth_duration: Histogram,
pub tokens_issued_total: IntCounter,
pub tokens_validated_total: IntCounter,
pub tokens_revoked_total: IntCounter,
pub active_tokens: IntGauge,
pub sessions_created_total: IntCounter,
pub sessions_destroyed_total: IntCounter,
pub active_sessions: IntGauge,
pub session_duration: Histogram,
pub storage_operations_total: IntCounter,
pub storage_errors_total: IntCounter,
pub storage_latency: Histogram,
pub storage_memory_usage: Gauge,
pub security_events_total: IntCounter,
pub rate_limit_hits_total: IntCounter,
pub suspicious_activity_total: IntCounter,
pub cpu_usage: Gauge,
pub memory_usage: Gauge,
pub concurrent_requests: IntGauge,
}
#[derive(Debug, Clone, Default)]
pub struct PerformanceMetrics {
pub requests_per_second: f64,
pub average_response_time: Duration,
pub p95_response_time: Duration,
pub p99_response_time: Duration,
pub error_rate: f64,
pub active_connections: u64,
pub memory_usage_mb: f64,
pub cpu_usage_percent: f64,
pub cache_hit_rate: f64,
pub storage_operations_per_second: f64,
}
pub struct SecurityMonitor {
failed_attempts: Arc<AtomicU64>,
rate_limit_violations: Arc<AtomicU64>,
suspicious_patterns: Arc<RwLock<HashMap<String, SuspiciousActivity>>>,
security_events: Arc<RwLock<Vec<SecurityEvent>>>,
threat_levels: Arc<RwLock<HashMap<String, ThreatLevel>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SuspiciousActivity {
pub user_id: String,
pub ip_address: String,
pub activity_type: String,
pub count: u64,
pub first_seen: SystemTime,
pub last_seen: SystemTime,
pub risk_score: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityEvent {
pub event_id: String,
pub event_type: SecurityEventType,
pub timestamp: SystemTime,
pub user_id: Option<String>,
pub ip_address: Option<String>,
pub details: HashMap<String, String>,
pub severity: EventSeverity,
pub action_taken: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SecurityEventType {
AuthFailure,
SuspiciousLogin,
RateLimitExceeded,
TokenAbuse,
PrivilegeEscalation,
DataExfiltration,
BruteForceAttempt,
AccountLockout,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventSeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ThreatLevel {
None,
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone)]
pub struct ObservabilityConfig {
pub enable_prometheus: bool,
pub enable_opentelemetry: bool,
pub enable_security_monitoring: bool,
pub metrics_retention_hours: u64,
pub trace_sampling_ratio: f64,
pub security_event_max_count: usize,
pub performance_window_seconds: u64,
}
impl Default for ObservabilityConfig {
fn default() -> Self {
Self {
enable_prometheus: true,
enable_opentelemetry: true,
enable_security_monitoring: true,
metrics_retention_hours: 24,
trace_sampling_ratio: 0.1,
security_event_max_count: 10000,
performance_window_seconds: 300, }
}
}
impl ObservabilityManager {
pub fn new() -> Result<Self> {
Self::with_config(ObservabilityConfig::default())
}
pub fn with_config(config: ObservabilityConfig) -> Result<Self> {
#[cfg(feature = "prometheus")]
let registry = Registry::new();
#[cfg(feature = "prometheus")]
let metrics = PrometheusMetrics::new(®istry)?;
let performance_metrics = Arc::new(RwLock::new(PerformanceMetrics::default()));
let security_monitor = Arc::new(SecurityMonitor::new());
let manager = Self {
#[cfg(feature = "prometheus")]
registry,
#[cfg(feature = "prometheus")]
metrics,
performance_metrics,
security_monitor,
config,
};
Ok(manager)
}
pub async fn record_auth_attempt(&self, success: bool, duration: Duration, _method: &str) {
#[cfg(feature = "prometheus")]
{
self.metrics.auth_requests_total.inc();
if success {
self.metrics.auth_success_total.inc();
} else {
self.metrics.auth_failures_total.inc();
}
self.metrics.auth_duration.observe(duration.as_secs_f64());
}
self.update_performance_metrics(duration, success).await;
}
pub async fn record_token_operation(&self, operation: &str, _token_id: &str) {
#[cfg(feature = "prometheus")]
{
match operation {
"issue" => self.metrics.tokens_issued_total.inc(),
"validate" => self.metrics.tokens_validated_total.inc(),
"revoke" => self.metrics.tokens_revoked_total.inc(),
_ => {}
}
}
}
pub async fn record_session_operation(&self, operation: &str, duration: Option<Duration>) {
#[cfg(feature = "prometheus")]
{
match operation {
"create" => self.metrics.sessions_created_total.inc(),
"destroy" => self.metrics.sessions_destroyed_total.inc(),
_ => {}
}
if let Some(dur) = duration {
self.metrics.session_duration.observe(dur.as_secs_f64());
}
}
}
pub async fn record_storage_operation(
&self,
_operation: &str,
latency: Duration,
success: bool,
) {
#[cfg(feature = "prometheus")]
{
self.metrics.storage_operations_total.inc();
if !success {
self.metrics.storage_errors_total.inc();
}
self.metrics.storage_latency.observe(latency.as_secs_f64());
}
}
pub async fn record_security_event(&self, event: SecurityEvent) {
#[cfg(feature = "prometheus")]
{
self.metrics.security_events_total.inc();
match event.event_type {
SecurityEventType::RateLimitExceeded => self.metrics.rate_limit_hits_total.inc(),
SecurityEventType::BruteForceAttempt
| SecurityEventType::SuspiciousLogin
| SecurityEventType::TokenAbuse => self.metrics.suspicious_activity_total.inc(),
_ => {}
}
}
self.security_monitor.record_event(event).await;
}
async fn update_performance_metrics(&self, response_time: Duration, success: bool) {
let mut metrics = self.performance_metrics.write().await;
metrics.average_response_time = Duration::from_millis(
(metrics.average_response_time.as_millis() as f64 * 0.95
+ response_time.as_millis() as f64 * 0.05) as u64,
);
let error_increment = if success { 0.0 } else { 1.0 };
metrics.error_rate = metrics.error_rate * 0.95 + error_increment * 0.05;
}
pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
self.performance_metrics.read().await.clone()
}
pub async fn get_security_events(&self, limit: Option<usize>) -> Vec<SecurityEvent> {
self.security_monitor.get_events(limit).await
}
pub async fn get_user_threat_level(&self, user_id: &str) -> ThreatLevel {
self.security_monitor.get_user_threat_level(user_id).await
}
#[cfg(feature = "prometheus")]
pub fn export_prometheus_metrics(&self) -> Result<String> {
use prometheus::TextEncoder;
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
encoder.encode_to_string(&metric_families).map_err(|e| {
AuthError::Storage(crate::errors::StorageError::OperationFailed {
message: format!("Failed to encode metrics: {}", e),
})
})
}
}
impl Default for ObservabilityManager {
fn default() -> Self {
Self::new().expect("Failed to create observability manager")
}
}
#[cfg(feature = "prometheus")]
impl PrometheusMetrics {
fn new(registry: &Registry) -> Result<Self> {
let auth_requests_total = register_int_counter_with_registry!(
Opts::new(
"auth_requests_total",
"Total number of authentication requests"
),
registry
)?;
let auth_success_total = register_int_counter_with_registry!(
Opts::new(
"auth_success_total",
"Total number of successful authentications"
),
registry
)?;
let auth_failures_total = register_int_counter_with_registry!(
Opts::new(
"auth_failures_total",
"Total number of failed authentications"
),
registry
)?;
let auth_duration = register_histogram_with_registry!(
HistogramOpts::new("auth_duration_seconds", "Authentication request duration"),
registry
)?;
let tokens_issued_total = register_int_counter_with_registry!(
Opts::new("tokens_issued_total", "Total number of tokens issued"),
registry
)?;
let tokens_validated_total = register_int_counter_with_registry!(
Opts::new(
"tokens_validated_total",
"Total number of token validations"
),
registry
)?;
let tokens_revoked_total = register_int_counter_with_registry!(
Opts::new("tokens_revoked_total", "Total number of tokens revoked"),
registry
)?;
let active_tokens = register_int_gauge_with_registry!(
Opts::new("active_tokens", "Number of currently active tokens"),
registry
)?;
let sessions_created_total = register_int_counter_with_registry!(
Opts::new("sessions_created_total", "Total number of sessions created"),
registry
)?;
let sessions_destroyed_total = register_int_counter_with_registry!(
Opts::new(
"sessions_destroyed_total",
"Total number of sessions destroyed"
),
registry
)?;
let active_sessions = register_int_gauge_with_registry!(
Opts::new("active_sessions", "Number of currently active sessions"),
registry
)?;
let session_duration = register_histogram_with_registry!(
HistogramOpts::new("session_duration_seconds", "Session duration"),
registry
)?;
let storage_operations_total = register_int_counter_with_registry!(
Opts::new(
"storage_operations_total",
"Total number of storage operations"
),
registry
)?;
let storage_errors_total = register_int_counter_with_registry!(
Opts::new("storage_errors_total", "Total number of storage errors"),
registry
)?;
let storage_latency = register_histogram_with_registry!(
HistogramOpts::new("storage_latency_seconds", "Storage operation latency"),
registry
)?;
let storage_memory_usage = register_gauge_with_registry!(
Opts::new("storage_memory_usage_bytes", "Storage memory usage"),
registry
)?;
let security_events_total = register_int_counter_with_registry!(
Opts::new("security_events_total", "Total number of security events"),
registry
)?;
let rate_limit_hits_total = register_int_counter_with_registry!(
Opts::new(
"rate_limit_hits_total",
"Total number of rate limit violations"
),
registry
)?;
let suspicious_activity_total = register_int_counter_with_registry!(
Opts::new(
"suspicious_activity_total",
"Total number of suspicious activities"
),
registry
)?;
let cpu_usage = register_gauge_with_registry!(
Opts::new("cpu_usage_percent", "CPU usage percentage"),
registry
)?;
let memory_usage = register_gauge_with_registry!(
Opts::new("memory_usage_bytes", "Memory usage in bytes"),
registry
)?;
let concurrent_requests = register_int_gauge_with_registry!(
Opts::new("concurrent_requests", "Number of concurrent requests"),
registry
)?;
Ok(Self {
auth_requests_total,
auth_success_total,
auth_failures_total,
auth_duration,
tokens_issued_total,
tokens_validated_total,
tokens_revoked_total,
active_tokens,
sessions_created_total,
sessions_destroyed_total,
active_sessions,
session_duration,
storage_operations_total,
storage_errors_total,
storage_latency,
storage_memory_usage,
security_events_total,
rate_limit_hits_total,
suspicious_activity_total,
cpu_usage,
memory_usage,
concurrent_requests,
})
}
}
impl SecurityMonitor {
fn new() -> Self {
Self {
failed_attempts: Arc::new(AtomicU64::new(0)),
rate_limit_violations: Arc::new(AtomicU64::new(0)),
suspicious_patterns: Arc::new(RwLock::new(HashMap::new())),
security_events: Arc::new(RwLock::new(Vec::new())),
threat_levels: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn record_event(&self, event: SecurityEvent) {
let mut events = self.security_events.write().await;
events.push(event.clone());
if events.len() > 10000 {
events.drain(0..1000);
}
if let Some(user_id) = &event.user_id {
self.update_threat_level(user_id, &event).await;
}
}
async fn update_threat_level(&self, user_id: &str, event: &SecurityEvent) {
let mut threat_levels = self.threat_levels.write().await;
let current_level = threat_levels.get(user_id).unwrap_or(&ThreatLevel::None);
let new_level = match (&event.event_type, current_level) {
(SecurityEventType::BruteForceAttempt, _) => ThreatLevel::High,
(SecurityEventType::SuspiciousLogin, ThreatLevel::Low) => ThreatLevel::Medium,
(SecurityEventType::TokenAbuse, _) => ThreatLevel::Medium,
(SecurityEventType::PrivilegeEscalation, _) => ThreatLevel::Critical,
(SecurityEventType::AuthFailure, ThreatLevel::None) => ThreatLevel::Low,
_ => current_level.clone(),
};
threat_levels.insert(user_id.to_string(), new_level);
}
async fn get_events(&self, limit: Option<usize>) -> Vec<SecurityEvent> {
let events = self.security_events.read().await;
let limit = limit.unwrap_or(100);
events.iter().rev().take(limit).cloned().collect()
}
async fn get_user_threat_level(&self, user_id: &str) -> ThreatLevel {
let threat_levels = self.threat_levels.read().await;
threat_levels
.get(user_id)
.unwrap_or(&ThreatLevel::None)
.clone()
}
}
pub struct ObservabilityMiddleware {
manager: Arc<ObservabilityManager>,
}
impl ObservabilityMiddleware {
pub fn new(manager: Arc<ObservabilityManager>) -> Self {
Self { manager }
}
pub async fn instrument_auth<F, T>(
&self,
operation: &str,
user_id: &str,
future: F,
) -> Result<T>
where
F: std::future::Future<Output = Result<T>>,
{
let start = std::time::Instant::now();
let result = future.await;
let duration = start.elapsed();
let success = result.is_ok();
self.manager
.record_auth_attempt(success, duration, operation)
.await;
if !success {
let event = SecurityEvent {
event_id: uuid::Uuid::new_v4().to_string(),
event_type: SecurityEventType::AuthFailure,
timestamp: SystemTime::now(),
user_id: Some(user_id.to_string()),
ip_address: None,
details: HashMap::new(),
severity: EventSeverity::Medium,
action_taken: None,
};
self.manager.record_security_event(event).await;
}
result
}
}