pulseengine_mcp_auth/
manager.rs

1//! Authentication manager implementation
2
3use crate::{
4    audit::{AuditConfig, AuditEvent, AuditEventType, AuditLogger, AuditSeverity, events},
5    config::AuthConfig,
6    jwt::{JwtConfig, JwtManager, TokenPair},
7    models::*,
8    storage::{StorageBackend, create_storage_backend},
9};
10use chrono::{DateTime, Utc};
11use pulseengine_mcp_protocol::{Request, Response};
12use std::collections::HashMap;
13use std::sync::Arc;
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tracing::{debug, error, info, warn};
17
18/// Simple request context for authentication
19#[derive(Debug, Clone)]
20pub struct RequestContext {
21    pub user_id: Option<String>,
22    pub roles: Vec<Role>,
23}
24
25#[derive(Debug, Error, serde::Serialize)]
26pub enum AuthError {
27    #[error("Authentication failed: {0}")]
28    Failed(String),
29
30    #[error("Configuration error: {0}")]
31    Config(String),
32
33    #[error("Storage error: {0}")]
34    Storage(String),
35
36    #[error("Validation error: {0}")]
37    Validation(String),
38}
39
40/// Authentication manager with comprehensive key management
41pub struct AuthenticationManager {
42    config: AuthConfig,
43    /// Validation configuration for rate limiting
44    validation_config: ValidationConfig,
45    /// Storage backend for persistent data
46    storage: Arc<dyn StorageBackend>,
47    /// In-memory cache for fast key lookups
48    api_keys_cache: Arc<RwLock<std::collections::HashMap<String, ApiKey>>>,
49    /// Rate limiting state per IP
50    rate_limit_state: Arc<RwLock<HashMap<String, RateLimitState>>>,
51    /// Per-role rate limiting state (role_key -> IP -> state)
52    role_rate_limit_state: Arc<RwLock<HashMap<String, HashMap<String, RoleRateLimitStats>>>>,
53    /// Audit logger for security events
54    audit_logger: Arc<AuditLogger>,
55    /// JWT manager for token-based authentication
56    jwt_manager: Arc<JwtManager>,
57}
58
59/// Rate limiting state for failed authentication attempts
60#[derive(Debug, Clone)]
61pub struct RateLimitState {
62    /// Number of failed attempts
63    pub failed_attempts: u32,
64    /// When the first attempt in the current window occurred
65    pub window_start: DateTime<Utc>,
66    /// When the client is blocked until (if any)
67    pub blocked_until: Option<DateTime<Utc>>,
68    /// Number of successful requests in current window (for role-based limiting)
69    pub successful_requests: u32,
70    /// When the success tracking window started
71    pub success_window_start: DateTime<Utc>,
72}
73
74/// Per-role rate limiting configuration
75#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
76pub struct RoleRateLimitConfig {
77    /// Maximum requests per time window
78    pub max_requests_per_window: u32,
79    /// Time window duration in minutes
80    pub window_duration_minutes: u64,
81    /// Burst allowance (additional requests allowed briefly)
82    pub burst_allowance: u32,
83    /// Cool-down period after hitting limits (minutes)
84    pub cooldown_duration_minutes: u64,
85}
86
87/// Validation configuration for rate limiting and security
88#[derive(Debug, Clone)]
89pub struct ValidationConfig {
90    /// Maximum failed attempts before rate limiting
91    pub max_failed_attempts: u32,
92    /// Time window for tracking failed attempts (minutes)
93    pub failed_attempt_window_minutes: u64,
94    /// How long to block after max attempts (minutes)
95    pub block_duration_minutes: u64,
96    /// Session timeout (minutes)
97    pub session_timeout_minutes: u64,
98    /// Enable strict IP validation
99    pub strict_ip_validation: bool,
100    /// Enable role-based rate limiting
101    pub enable_role_based_rate_limiting: bool,
102    /// Per-role rate limiting configurations
103    pub role_rate_limits: std::collections::HashMap<String, RoleRateLimitConfig>,
104}
105
106impl Default for ValidationConfig {
107    fn default() -> Self {
108        let mut role_rate_limits = std::collections::HashMap::new();
109
110        // Default role-based rate limits
111        role_rate_limits.insert(
112            "admin".to_string(),
113            RoleRateLimitConfig {
114                max_requests_per_window: 1000,
115                window_duration_minutes: 60,
116                burst_allowance: 100,
117                cooldown_duration_minutes: 5,
118            },
119        );
120
121        role_rate_limits.insert(
122            "operator".to_string(),
123            RoleRateLimitConfig {
124                max_requests_per_window: 500,
125                window_duration_minutes: 60,
126                burst_allowance: 50,
127                cooldown_duration_minutes: 10,
128            },
129        );
130
131        role_rate_limits.insert(
132            "monitor".to_string(),
133            RoleRateLimitConfig {
134                max_requests_per_window: 200,
135                window_duration_minutes: 60,
136                burst_allowance: 20,
137                cooldown_duration_minutes: 15,
138            },
139        );
140
141        role_rate_limits.insert(
142            "device".to_string(),
143            RoleRateLimitConfig {
144                max_requests_per_window: 100,
145                window_duration_minutes: 60,
146                burst_allowance: 10,
147                cooldown_duration_minutes: 20,
148            },
149        );
150
151        role_rate_limits.insert(
152            "custom".to_string(),
153            RoleRateLimitConfig {
154                max_requests_per_window: 50,
155                window_duration_minutes: 60,
156                burst_allowance: 5,
157                cooldown_duration_minutes: 30,
158            },
159        );
160
161        Self {
162            max_failed_attempts: 4,
163            failed_attempt_window_minutes: 15,
164            block_duration_minutes: 30,
165            session_timeout_minutes: 480, // 8 hours
166            strict_ip_validation: true,
167            enable_role_based_rate_limiting: true,
168            role_rate_limits,
169        }
170    }
171}
172
173/// Rate limiting statistics
174#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
175pub struct RateLimitStats {
176    /// Number of IPs being tracked
177    pub total_tracked_ips: usize,
178    /// Number of currently blocked IPs
179    pub currently_blocked_ips: u32,
180    /// Total failed attempts across all IPs
181    pub total_failed_attempts: u64,
182    /// Role-based rate limiting statistics
183    pub role_stats: std::collections::HashMap<String, RoleRateLimitStats>,
184}
185
186/// Per-role rate limiting statistics
187#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
188pub struct RoleRateLimitStats {
189    /// Current requests in window
190    pub current_requests: u32,
191    /// Requests blocked due to rate limits
192    pub blocked_requests: u64,
193    /// Total requests processed
194    pub total_requests: u64,
195    /// Is currently in cooldown
196    pub in_cooldown: bool,
197    /// Cooldown ends at (if in cooldown)
198    pub cooldown_ends_at: Option<DateTime<Utc>>,
199    /// When the current window started
200    pub last_window_start: Option<DateTime<Utc>>,
201}
202
203impl AuthenticationManager {
204    pub async fn new(config: AuthConfig) -> Result<Self, AuthError> {
205        // Create storage backend
206        let storage = create_storage_backend(&config.storage)
207            .await
208            .map_err(|e| AuthError::Storage(e.to_string()))?;
209
210        // Create audit logger
211        let audit_config = AuditConfig::default();
212        let audit_logger =
213            Arc::new(AuditLogger::new(audit_config).await.map_err(|e| {
214                AuthError::Config(format!("Failed to initialize audit logger: {}", e))
215            })?);
216
217        // Create JWT manager
218        let jwt_config = JwtConfig::default();
219        let jwt_manager =
220            Arc::new(JwtManager::new(jwt_config).map_err(|e| {
221                AuthError::Config(format!("Failed to initialize JWT manager: {}", e))
222            })?);
223
224        let manager = Self {
225            storage,
226            validation_config: ValidationConfig::default(),
227            api_keys_cache: Arc::new(RwLock::new(HashMap::new())),
228            rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
229            role_rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
230            audit_logger,
231            jwt_manager,
232            config,
233        };
234
235        // Load initial keys into cache
236        manager.refresh_cache().await?;
237
238        // Log system startup
239        let startup_event = AuditEvent::new(
240            AuditEventType::SystemStartup,
241            AuditSeverity::Info,
242            "auth_manager".to_string(),
243            "Authentication manager initialized successfully".to_string(),
244        );
245        let _ = manager.audit_logger.log(startup_event).await;
246
247        info!("Authentication manager initialized successfully");
248        Ok(manager)
249    }
250
251    pub async fn new_with_validation(
252        config: AuthConfig,
253        validation_config: ValidationConfig,
254    ) -> Result<Self, AuthError> {
255        // Create storage backend
256        let storage = create_storage_backend(&config.storage)
257            .await
258            .map_err(|e| AuthError::Storage(e.to_string()))?;
259
260        // Create audit logger
261        let audit_config = AuditConfig::default();
262        let audit_logger =
263            Arc::new(AuditLogger::new(audit_config).await.map_err(|e| {
264                AuthError::Config(format!("Failed to initialize audit logger: {}", e))
265            })?);
266
267        // Create JWT manager
268        let jwt_config = JwtConfig::default();
269        let jwt_manager =
270            Arc::new(JwtManager::new(jwt_config).map_err(|e| {
271                AuthError::Config(format!("Failed to initialize JWT manager: {}", e))
272            })?);
273
274        let manager = Self {
275            storage,
276            validation_config,
277            api_keys_cache: Arc::new(RwLock::new(HashMap::new())),
278            rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
279            role_rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
280            audit_logger,
281            jwt_manager,
282            config,
283        };
284
285        // Load initial keys into cache
286        manager.refresh_cache().await?;
287
288        // Log system startup
289        let startup_event = AuditEvent::new(
290            AuditEventType::SystemStartup,
291            AuditSeverity::Info,
292            "auth_manager".to_string(),
293            "Authentication manager initialized with custom validation config".to_string(),
294        );
295        let _ = manager.audit_logger.log(startup_event).await;
296
297        info!("Authentication manager initialized with custom validation config");
298        Ok(manager)
299    }
300
301    /// Create a new API key
302    pub async fn create_api_key(
303        &self,
304        name: String,
305        role: Role,
306        expires_at: Option<DateTime<Utc>>,
307        ip_whitelist: Option<Vec<String>>,
308    ) -> Result<ApiKey, AuthError> {
309        let key = ApiKey::new(name, role, expires_at, ip_whitelist.unwrap_or_default());
310
311        // Save to storage
312        self.storage
313            .save_key(&key)
314            .await
315            .map_err(|e| AuthError::Storage(e.to_string()))?;
316
317        // Update cache
318        {
319            let mut cache = self.api_keys_cache.write().await;
320            cache.insert(key.id.clone(), key.clone());
321        }
322
323        // Log key creation event
324        let audit_event = events::key_created(&key.id, "system", &key.role.to_string());
325        let _ = self.audit_logger.log(audit_event).await;
326
327        info!("Created new API key: {} ({})", key.id, key.name);
328        Ok(key)
329    }
330
331    /// Validate an API key with comprehensive security checks
332    pub async fn validate_api_key(
333        &self,
334        key_secret: &str,
335        client_ip: Option<&str>,
336    ) -> Result<Option<AuthContext>, AuthError> {
337        let client_ip = client_ip.unwrap_or("unknown");
338
339        // Check rate limiting first
340        if let Some(blocked_until) = self.check_rate_limit(client_ip).await {
341            // Log rate limiting event
342            let audit_event = AuditEvent::new(
343                AuditEventType::AuthRateLimited,
344                AuditSeverity::Warning,
345                "rate_limiter".to_string(),
346                format!(
347                    "IP {} blocked due to rate limiting until {}",
348                    client_ip,
349                    blocked_until.format("%Y-%m-%d %H:%M:%S UTC")
350                ),
351            )
352            .with_client_ip(client_ip.to_string());
353            let _ = self.audit_logger.log(audit_event).await;
354
355            return Err(AuthError::Failed(format!(
356                "IP {} is rate limited until {}",
357                client_ip,
358                blocked_until.format("%Y-%m-%d %H:%M:%S UTC")
359            )));
360        }
361
362        let key = {
363            let cache = self.api_keys_cache.read().await;
364
365            // Find key by verifying the provided secret against stored hashes
366            cache
367                .values()
368                .find(|key| {
369                    // Use secure verification if available, otherwise fallback to plain text
370                    key.verify_key(key_secret).unwrap_or_default()
371                })
372                .cloned()
373        };
374
375        let key = match key {
376            Some(key) => key,
377            None => {
378                self.record_failed_attempt(client_ip).await;
379
380                // Log authentication failure
381                let audit_event = events::auth_failure(client_ip, "Invalid API key");
382                let _ = self.audit_logger.log(audit_event).await;
383
384                return Err(AuthError::Failed("Invalid API key".to_string()));
385            }
386        };
387
388        // Validate the key
389        if let Err(reason) = self.validate_key_security(&key, client_ip) {
390            self.record_failed_attempt(client_ip).await;
391
392            // Log authentication failure with reason
393            let audit_event = events::auth_failure(client_ip, &reason);
394            let _ = self.audit_logger.log(audit_event).await;
395
396            return Err(AuthError::Failed(reason));
397        }
398
399        // Check role-based rate limiting
400        if let Ok(is_rate_limited) = self.check_role_rate_limit(&key.role, client_ip).await {
401            if is_rate_limited {
402                self.record_failed_attempt(client_ip).await;
403
404                // Log role-based rate limiting
405                let audit_event = events::auth_failure(
406                    client_ip,
407                    &format!(
408                        "Role-based rate limit exceeded for role {}",
409                        self.get_role_key(&key.role)
410                    ),
411                );
412                let _ = self.audit_logger.log(audit_event).await;
413
414                return Err(AuthError::Failed(format!(
415                    "Rate limit exceeded for role {}",
416                    self.get_role_key(&key.role)
417                )));
418            }
419        }
420
421        // Clear any failed attempts for this IP
422        let mut updated_key = key.clone();
423
424        self.clear_failed_attempts(client_ip).await;
425
426        // Update key usage
427        updated_key.mark_used();
428
429        // Update in storage and cache
430        if let Err(e) = self.storage.save_key(&updated_key).await {
431            warn!("Failed to update key usage statistics: {}", e);
432        } else {
433            let mut cache = self.api_keys_cache.write().await;
434            cache.insert(updated_key.id.clone(), updated_key.clone());
435        }
436
437        // Log successful authentication and key usage
438        let auth_event = events::auth_success(&key.id, client_ip);
439        let _ = self.audit_logger.log(auth_event).await;
440
441        let key_usage_event = events::key_used(&key.id, client_ip);
442        let _ = self.audit_logger.log(key_usage_event).await;
443
444        // Return valid auth context
445        Ok(Some(AuthContext {
446            user_id: Some(key.id.clone()),
447            roles: vec![key.role.clone()],
448            api_key_id: Some(key.id.clone()),
449            permissions: self.get_permissions_for_role(&key.role),
450        }))
451    }
452
453    /// Validate an API key (legacy method without IP checking)
454    pub async fn validate_api_key_legacy(
455        &self,
456        key_secret: &str,
457    ) -> Result<Option<AuthContext>, AuthError> {
458        self.validate_api_key(key_secret, None).await
459    }
460
461    /// List all API keys
462    pub async fn list_keys(&self) -> Vec<ApiKey> {
463        let cache = self.api_keys_cache.read().await;
464        cache.values().cloned().collect()
465    }
466
467    /// Get a specific API key by ID
468    pub async fn get_key(&self, key_id: &str) -> Option<ApiKey> {
469        let cache = self.api_keys_cache.read().await;
470        cache.get(key_id).cloned()
471    }
472
473    /// Update an existing API key
474    pub async fn update_key(&self, key: ApiKey) -> Result<(), AuthError> {
475        // Save to storage
476        self.storage
477            .save_key(&key)
478            .await
479            .map_err(|e| AuthError::Storage(e.to_string()))?;
480
481        // Update cache
482        {
483            let mut cache = self.api_keys_cache.write().await;
484            cache.insert(key.id.clone(), key.clone());
485        }
486
487        debug!("Updated API key: {}", key.id);
488        Ok(())
489    }
490
491    /// Revoke/delete an API key
492    pub async fn revoke_key(&self, key_id: &str) -> Result<bool, AuthError> {
493        // Remove from storage
494        self.storage
495            .delete_key(key_id)
496            .await
497            .map_err(|e| AuthError::Storage(e.to_string()))?;
498
499        // Remove from cache
500        let removed = {
501            let mut cache = self.api_keys_cache.write().await;
502            cache.remove(key_id).is_some()
503        };
504
505        if removed {
506            info!("Revoked API key: {}", key_id);
507        } else {
508            warn!("Attempted to revoke non-existent key: {}", key_id);
509        }
510
511        Ok(removed)
512    }
513
514    /// Check if an IP is currently rate limited
515    async fn check_rate_limit(&self, client_ip: &str) -> Option<DateTime<Utc>> {
516        let rate_limits = self.rate_limit_state.read().await;
517
518        if let Some(state) = rate_limits.get(client_ip) {
519            if let Some(blocked_until) = state.blocked_until {
520                if Utc::now() < blocked_until {
521                    return Some(blocked_until);
522                }
523            }
524        }
525
526        None
527    }
528
529    /// Record a failed authentication attempt
530    async fn record_failed_attempt(&self, client_ip: &str) {
531        let mut rate_limits = self.rate_limit_state.write().await;
532        let now = Utc::now();
533
534        let state = rate_limits
535            .entry(client_ip.to_string())
536            .or_insert_with(|| RateLimitState {
537                failed_attempts: 0,
538                window_start: now,
539                blocked_until: None,
540                successful_requests: 0,
541                success_window_start: now,
542            });
543
544        // Check if we're in a new time window
545        let window_duration =
546            chrono::Duration::minutes(self.validation_config.failed_attempt_window_minutes as i64);
547        if now - state.window_start > window_duration {
548            // Reset to new window
549            state.failed_attempts = 1;
550            state.window_start = now;
551            state.blocked_until = None;
552        } else {
553            // Increment attempts in current window
554            state.failed_attempts += 1;
555
556            // Check if we've exceeded the limit
557            if state.failed_attempts >= self.validation_config.max_failed_attempts {
558                let block_duration =
559                    chrono::Duration::minutes(self.validation_config.block_duration_minutes as i64);
560                state.blocked_until = Some(now + block_duration);
561
562                warn!(
563                    "IP {} blocked for {} minutes after {} failed attempts",
564                    client_ip, self.validation_config.block_duration_minutes, state.failed_attempts
565                );
566            }
567        }
568
569        debug!(
570            "Failed attempt #{} from IP {} (window started: {})",
571            state.failed_attempts, client_ip, state.window_start
572        );
573    }
574
575    /// Clear failed attempts for an IP (after successful auth)
576    async fn clear_failed_attempts(&self, client_ip: &str) {
577        let mut rate_limits = self.rate_limit_state.write().await;
578        if rate_limits.remove(client_ip).is_some() {
579            debug!("Cleared failed attempts for IP: {}", client_ip);
580        }
581    }
582
583    /// Validate an API key's security properties
584    fn validate_key_security(&self, key: &ApiKey, client_ip: &str) -> Result<(), String> {
585        // Check if key is active
586        if !key.active {
587            return Err("API key is disabled".to_string());
588        }
589
590        // Check if key has expired
591        if let Some(expires_at) = key.expires_at {
592            if Utc::now() > expires_at {
593                return Err("API key has expired".to_string());
594            }
595        }
596
597        // Check IP whitelist
598        if self.validation_config.strict_ip_validation && !key.ip_whitelist.is_empty() {
599            let is_ip_allowed = key.ip_whitelist.iter().any(|allowed_ip| {
600                // Simple IP matching (can be enhanced with CIDR support)
601                allowed_ip == client_ip || allowed_ip == "*"
602            });
603
604            if !is_ip_allowed {
605                return Err(format!("IP address {client_ip} not allowed for this key"));
606            }
607        }
608
609        Ok(())
610    }
611
612    /// Get permissions for a role
613    fn get_permissions_for_role(&self, role: &Role) -> Vec<String> {
614        match role {
615            Role::Admin => vec![
616                "admin.*".to_string(),
617                "device.*".to_string(),
618                "system.*".to_string(),
619                "mcp.*".to_string(),
620            ],
621            Role::Operator => vec![
622                "device.*".to_string(),
623                "system.status".to_string(),
624                "mcp.tools.*".to_string(),
625                "mcp.resources.read".to_string(),
626            ],
627            Role::Monitor => vec![
628                "device.read".to_string(),
629                "system.status".to_string(),
630                "mcp.resources.read".to_string(),
631            ],
632            Role::Device { allowed_devices } => allowed_devices
633                .iter()
634                .map(|device| format!("device.{device}"))
635                .collect(),
636            Role::Custom { permissions } => permissions.clone(),
637        }
638    }
639
640    /// Get current rate limit statistics
641    pub async fn get_rate_limit_stats(&self) -> RateLimitStats {
642        let rate_limits = self.rate_limit_state.read().await;
643        let role_states = self.role_rate_limit_state.read().await;
644        let now = Utc::now();
645
646        let mut stats = RateLimitStats {
647            total_tracked_ips: rate_limits.len(),
648            currently_blocked_ips: 0,
649            total_failed_attempts: 0,
650            role_stats: std::collections::HashMap::new(),
651        };
652
653        for state in rate_limits.values() {
654            stats.total_failed_attempts += state.failed_attempts as u64;
655
656            if let Some(blocked_until) = state.blocked_until {
657                if now < blocked_until {
658                    stats.currently_blocked_ips += 1;
659                }
660            }
661        }
662
663        // Collect role-based statistics
664        for (role_key, ip_states) in role_states.iter() {
665            let mut role_statistics = RoleRateLimitStats {
666                current_requests: 0,
667                blocked_requests: 0,
668                total_requests: 0,
669                in_cooldown: false,
670                cooldown_ends_at: None,
671                last_window_start: None,
672            };
673
674            for state in ip_states.values() {
675                role_statistics.current_requests += state.current_requests;
676                role_statistics.blocked_requests += state.blocked_requests;
677                role_statistics.total_requests += state.total_requests;
678
679                // Check if any IP is in cooldown for this role
680                if let Some(cooldown_end) = state.cooldown_ends_at {
681                    if now < cooldown_end {
682                        role_statistics.in_cooldown = true;
683                        if role_statistics.cooldown_ends_at.is_none()
684                            || cooldown_end > role_statistics.cooldown_ends_at.unwrap()
685                        {
686                            role_statistics.cooldown_ends_at = Some(cooldown_end);
687                        }
688                    }
689                }
690            }
691
692            stats.role_stats.insert(role_key.clone(), role_statistics);
693        }
694
695        stats
696    }
697
698    /// Clean up old rate limit entries (should be called periodically)
699    pub async fn cleanup_rate_limits(&self) {
700        let mut rate_limits = self.rate_limit_state.write().await;
701        let now = Utc::now();
702        let cleanup_threshold = chrono::Duration::hours(24); // Remove entries older than 24 hours
703
704        let initial_count = rate_limits.len();
705        rate_limits.retain(|_ip, state| {
706            // Keep if blocked and still in block period
707            if let Some(blocked_until) = state.blocked_until {
708                if now < blocked_until {
709                    return true;
710                }
711            }
712
713            // Keep if within the tracking window
714            now - state.window_start < cleanup_threshold
715        });
716
717        let removed_count = initial_count - rate_limits.len();
718        if removed_count > 0 {
719            debug!("Cleaned up {} old rate limit entries", removed_count);
720        }
721    }
722
723    // Role-based rate limiting methods
724
725    /// Check if a role-based request should be rate limited
726    pub async fn check_role_rate_limit(
727        &self,
728        role: &Role,
729        client_ip: &str,
730    ) -> Result<bool, AuthError> {
731        if !self.validation_config.enable_role_based_rate_limiting {
732            return Ok(false); // Rate limiting disabled
733        }
734
735        let role_key = self.get_role_key(role);
736        let role_config = match self.validation_config.role_rate_limits.get(&role_key) {
737            Some(config) => config.clone(),
738            None => {
739                // Use default for custom roles or fallback
740                warn!(
741                    "No rate limit config found for role '{}', using default",
742                    role_key
743                );
744                return Ok(false);
745            }
746        };
747
748        let mut role_states = self.role_rate_limit_state.write().await;
749        let role_state_map = role_states
750            .entry(role_key.clone())
751            .or_insert_with(HashMap::new);
752
753        let now = Utc::now();
754        let state = role_state_map
755            .entry(client_ip.to_string())
756            .or_insert_with(|| RoleRateLimitStats {
757                current_requests: 0,
758                blocked_requests: 0,
759                total_requests: 0,
760                in_cooldown: false,
761                cooldown_ends_at: None,
762                last_window_start: None,
763            });
764
765        // Check if still in cooldown
766        if let Some(cooldown_end) = state.cooldown_ends_at {
767            if now < cooldown_end {
768                state.blocked_requests += 1;
769
770                // Log rate limiting event
771                let audit_event = crate::audit::AuditEvent::new(
772                    crate::audit::AuditEventType::AuthRateLimited,
773                    crate::audit::AuditSeverity::Warning,
774                    "role_rate_limiter".to_string(),
775                    format!(
776                        "Role {} from IP {} blocked (cooldown until {})",
777                        role_key,
778                        client_ip,
779                        cooldown_end.format("%Y-%m-%d %H:%M:%S UTC")
780                    ),
781                )
782                .with_client_ip(client_ip.to_string());
783                let _ = self.audit_logger.log(audit_event).await;
784
785                return Ok(true); // Still rate limited
786            }
787            // Cooldown expired, reset state
788            state.in_cooldown = false;
789            state.cooldown_ends_at = None;
790            state.current_requests = 0;
791        }
792
793        // Check if we're in a new time window
794        let window_duration = chrono::Duration::minutes(role_config.window_duration_minutes as i64);
795
796        // Reset counter if we've moved to a new window
797        if let Some(last_window_start) = state.last_window_start {
798            if now.signed_duration_since(last_window_start) >= window_duration {
799                state.current_requests = 0;
800                state.last_window_start = Some(now);
801            }
802        } else {
803            state.last_window_start = Some(now);
804        }
805
806        state.current_requests += 1;
807        state.total_requests += 1;
808
809        // Check if we've exceeded the limit (including burst allowance)
810        let effective_limit = role_config.max_requests_per_window + role_config.burst_allowance;
811        if state.current_requests > effective_limit {
812            // Enter cooldown
813            state.in_cooldown = true;
814            state.cooldown_ends_at =
815                Some(now + chrono::Duration::minutes(role_config.cooldown_duration_minutes as i64));
816            state.blocked_requests += 1;
817
818            // Log rate limiting event
819            let audit_event = crate::audit::AuditEvent::new(
820                crate::audit::AuditEventType::AuthRateLimited,
821                crate::audit::AuditSeverity::Warning,
822                "role_rate_limiter".to_string(),
823                format!(
824                    "Role {} from IP {} rate limited for {} minutes after {} requests",
825                    role_key,
826                    client_ip,
827                    role_config.cooldown_duration_minutes,
828                    state.current_requests
829                ),
830            )
831            .with_client_ip(client_ip.to_string());
832            let _ = self.audit_logger.log(audit_event).await;
833
834            warn!(
835                "Role {} from IP {} rate limited for {} minutes after {} requests",
836                role_key, client_ip, role_config.cooldown_duration_minutes, state.current_requests
837            );
838
839            return Ok(true); // Rate limited
840        }
841
842        // Log successful request
843        if state.current_requests % 100 == 0 {
844            // Log every 100th request to avoid spam
845            let audit_event = crate::audit::AuditEvent::new(
846                crate::audit::AuditEventType::AuthSuccess,
847                crate::audit::AuditSeverity::Info,
848                "role_rate_limiter".to_string(),
849                format!(
850                    "Role {} from IP {} processed {} requests in window",
851                    role_key, client_ip, state.current_requests
852                ),
853            )
854            .with_client_ip(client_ip.to_string());
855            let _ = self.audit_logger.log(audit_event).await;
856        }
857
858        Ok(false) // Not rate limited
859    }
860
861    /// Get a consistent role key for rate limiting
862    fn get_role_key(&self, role: &Role) -> String {
863        match role {
864            Role::Admin => "admin".to_string(),
865            Role::Operator => "operator".to_string(),
866            Role::Monitor => "monitor".to_string(),
867            Role::Device { .. } => "device".to_string(),
868            Role::Custom { .. } => "custom".to_string(),
869        }
870    }
871
872    /// Update role rate limit configuration
873    pub async fn update_role_rate_limit(
874        &self,
875        role_key: String,
876        config: RoleRateLimitConfig,
877    ) -> Result<(), AuthError> {
878        // This would typically require updating the configuration file
879        // For now, we'll just log the change since ValidationConfig is not mutable
880        warn!(
881            "Role rate limit update requested for '{}' but configuration is immutable",
882            role_key
883        );
884
885        // Log configuration change
886        let audit_event = crate::audit::AuditEvent::new(
887            crate::audit::AuditEventType::SystemStartup,
888            crate::audit::AuditSeverity::Info,
889            "role_rate_limiter".to_string(),
890            format!(
891                "Rate limit configuration update requested for role '{}' (max_requests: {}, window: {} min)",
892                role_key, config.max_requests_per_window, config.window_duration_minutes
893            ),
894        );
895        let _ = self.audit_logger.log(audit_event).await;
896
897        Ok(())
898    }
899
900    /// Clean up old role rate limit entries
901    pub async fn cleanup_role_rate_limits(&self) {
902        let mut role_states = self.role_rate_limit_state.write().await;
903        let now = Utc::now();
904        let cleanup_threshold = chrono::Duration::hours(24); // Remove entries older than 24 hours
905
906        let mut total_removed = 0;
907
908        for (_role_key, ip_states) in role_states.iter_mut() {
909            let initial_count = ip_states.len();
910            ip_states.retain(|_ip, state| {
911                // Keep if in cooldown
912                if let Some(cooldown_end) = state.cooldown_ends_at {
913                    if now < cooldown_end {
914                        return true;
915                    }
916                }
917
918                // Keep if window started recently
919                if let Some(window_start) = state.last_window_start {
920                    if now.signed_duration_since(window_start) < cleanup_threshold {
921                        return true;
922                    }
923                }
924
925                // Remove old inactive entries
926                false
927            });
928
929            let removed = initial_count - ip_states.len();
930            total_removed += removed;
931        }
932
933        // Remove empty role entries
934        role_states.retain(|_role, ip_states| !ip_states.is_empty());
935
936        if total_removed > 0 {
937            debug!("Cleaned up {} old role rate limit entries", total_removed);
938        }
939    }
940
941    /// Refresh the in-memory cache from storage
942    async fn refresh_cache(&self) -> Result<(), AuthError> {
943        let keys = self
944            .storage
945            .load_keys()
946            .await
947            .map_err(|e| AuthError::Storage(e.to_string()))?;
948
949        let mut cache = self.api_keys_cache.write().await;
950        *cache = keys;
951
952        debug!("Refreshed cache with {} keys", cache.len());
953        Ok(())
954    }
955
956    /// Disable/enable an API key without deleting it
957    pub async fn disable_key(&self, key_id: &str) -> Result<bool, AuthError> {
958        let mut key = match self.get_key(key_id).await {
959            Some(key) => key,
960            None => return Ok(false),
961        };
962
963        key.active = false;
964        self.update_key(key).await?;
965
966        info!("Disabled API key: {}", key_id);
967        Ok(true)
968    }
969
970    /// Enable a previously disabled API key
971    pub async fn enable_key(&self, key_id: &str) -> Result<bool, AuthError> {
972        let mut key = match self.get_key(key_id).await {
973            Some(key) => key,
974            None => return Ok(false),
975        };
976
977        key.active = true;
978        self.update_key(key).await?;
979
980        info!("Enabled API key: {}", key_id);
981        Ok(true)
982    }
983
984    /// Update key expiration date
985    pub async fn update_key_expiration(
986        &self,
987        key_id: &str,
988        expires_at: Option<DateTime<Utc>>,
989    ) -> Result<bool, AuthError> {
990        let mut key = match self.get_key(key_id).await {
991            Some(key) => key,
992            None => return Ok(false),
993        };
994
995        key.expires_at = expires_at;
996        self.update_key(key).await?;
997
998        info!("Updated expiration for API key: {}", key_id);
999        Ok(true)
1000    }
1001
1002    /// Update key IP whitelist
1003    pub async fn update_key_ip_whitelist(
1004        &self,
1005        key_id: &str,
1006        ip_whitelist: Vec<String>,
1007    ) -> Result<bool, AuthError> {
1008        let mut key = match self.get_key(key_id).await {
1009            Some(key) => key,
1010            None => return Ok(false),
1011        };
1012
1013        key.ip_whitelist = ip_whitelist;
1014        self.update_key(key).await?;
1015
1016        info!("Updated IP whitelist for API key: {}", key_id);
1017        Ok(true)
1018    }
1019
1020    /// Get keys by role
1021    pub async fn list_keys_by_role(&self, role: &Role) -> Vec<ApiKey> {
1022        let cache = self.api_keys_cache.read().await;
1023        cache
1024            .values()
1025            .filter(|key| &key.role == role)
1026            .cloned()
1027            .collect()
1028    }
1029
1030    /// Get active keys only
1031    pub async fn list_active_keys(&self) -> Vec<ApiKey> {
1032        let cache = self.api_keys_cache.read().await;
1033        cache
1034            .values()
1035            .filter(|key| key.active && !key.is_expired())
1036            .cloned()
1037            .collect()
1038    }
1039
1040    /// Get expired keys
1041    pub async fn list_expired_keys(&self) -> Vec<ApiKey> {
1042        let cache = self.api_keys_cache.read().await;
1043        cache
1044            .values()
1045            .filter(|key| key.is_expired())
1046            .cloned()
1047            .collect()
1048    }
1049
1050    /// Bulk revoke keys (useful for security incidents)
1051    pub async fn bulk_revoke_keys(&self, key_ids: &[String]) -> Result<Vec<String>, AuthError> {
1052        let mut revoked = Vec::new();
1053
1054        for key_id in key_ids {
1055            match self.revoke_key(key_id).await {
1056                Ok(true) => revoked.push(key_id.clone()),
1057                Ok(false) => debug!("Key {} was already revoked or not found", key_id),
1058                Err(e) => error!("Failed to revoke key {}: {}", key_id, e),
1059            }
1060        }
1061
1062        info!("Bulk revoked {} keys", revoked.len());
1063        Ok(revoked)
1064    }
1065
1066    /// Clean up expired keys
1067    pub async fn cleanup_expired_keys(&self) -> Result<u32, AuthError> {
1068        let expired_keys = self.list_expired_keys().await;
1069        let key_ids: Vec<String> = expired_keys.iter().map(|k| k.id.clone()).collect();
1070
1071        let revoked = self.bulk_revoke_keys(&key_ids).await?;
1072
1073        info!("Cleaned up {} expired keys", revoked.len());
1074        Ok(revoked.len() as u32)
1075    }
1076
1077    /// Get key usage statistics
1078    pub async fn get_key_usage_stats(&self) -> Result<KeyUsageStats, AuthError> {
1079        let cache = self.api_keys_cache.read().await;
1080        let mut stats = KeyUsageStats::default();
1081
1082        for key in cache.values() {
1083            stats.total_keys += 1;
1084
1085            if key.active {
1086                stats.active_keys += 1;
1087            } else {
1088                stats.disabled_keys += 1;
1089            }
1090
1091            if key.is_expired() {
1092                stats.expired_keys += 1;
1093            }
1094
1095            stats.total_usage_count += key.usage_count;
1096
1097            // Track by role
1098            match &key.role {
1099                Role::Admin => stats.admin_keys += 1,
1100                Role::Operator => stats.operator_keys += 1,
1101                Role::Monitor => stats.monitor_keys += 1,
1102                Role::Device { .. } => stats.device_keys += 1,
1103                Role::Custom { .. } => stats.custom_keys += 1,
1104            }
1105        }
1106
1107        Ok(stats)
1108    }
1109
1110    /// Create multiple API keys for bulk provisioning
1111    pub async fn bulk_create_keys(
1112        &self,
1113        requests: Vec<KeyCreationRequest>,
1114    ) -> Result<Vec<Result<ApiKey, AuthError>>, AuthError> {
1115        let mut results = Vec::new();
1116
1117        for request in requests {
1118            let result = self
1119                .create_api_key(
1120                    request.name,
1121                    request.role,
1122                    request.expires_at,
1123                    request.ip_whitelist,
1124                )
1125                .await;
1126            results.push(result);
1127        }
1128
1129        Ok(results)
1130    }
1131
1132    /// Check if the authentication manager has all required methods for production use
1133    pub fn check_api_completeness(&self) -> ApiCompletenessCheck {
1134        ApiCompletenessCheck {
1135            has_create_key: true,
1136            has_validate_key: true,
1137            has_list_keys: true,
1138            has_revoke_key: true,
1139            has_update_key: true,
1140            has_bulk_operations: true,
1141            has_role_based_access: true,
1142            has_rate_limiting: true,
1143            has_ip_whitelisting: true,
1144            has_expiration_support: true,
1145            has_usage_tracking: true,
1146            framework_version: env!("CARGO_PKG_VERSION").to_string(),
1147            production_ready: true,
1148        }
1149    }
1150
1151    pub async fn start_background_tasks(&self) -> Result<(), AuthError> {
1152        Ok(())
1153    }
1154
1155    pub async fn stop_background_tasks(&self) -> Result<(), AuthError> {
1156        Ok(())
1157    }
1158
1159    pub async fn health_check(&self) -> Result<(), AuthError> {
1160        Ok(())
1161    }
1162
1163    pub async fn process_request(
1164        &self,
1165        request: Request,
1166        _context: &RequestContext,
1167    ) -> Result<Request, AuthError> {
1168        if !self.config.enabled {
1169            return Ok(request);
1170        }
1171
1172        // For now, just pass through - implement authentication logic later
1173        Ok(request)
1174    }
1175
1176    pub async fn process_response(
1177        &self,
1178        response: Response,
1179        _context: &RequestContext,
1180    ) -> Result<Response, AuthError> {
1181        Ok(response)
1182    }
1183
1184    // JWT Token-based Authentication Methods
1185
1186    /// Generate a JWT token pair for an API key
1187    pub async fn generate_token_for_key(
1188        &self,
1189        key_id: &str,
1190        client_ip: Option<String>,
1191        session_id: Option<String>,
1192        scope: Vec<String>,
1193    ) -> Result<TokenPair, AuthError> {
1194        // Get the API key
1195        let key = self
1196            .get_key(key_id)
1197            .await
1198            .ok_or_else(|| AuthError::Failed("API key not found".to_string()))?;
1199
1200        // Verify key is valid
1201        if !key.is_valid() {
1202            return Err(AuthError::Failed(
1203                "API key is invalid or expired".to_string(),
1204            ));
1205        }
1206
1207        // Generate token pair
1208        let token_pair = self
1209            .jwt_manager
1210            .generate_token_pair(
1211                key.id.clone(),
1212                vec![key.role.clone()],
1213                Some(key.id.clone()),
1214                client_ip.clone(),
1215                session_id.clone(),
1216                scope,
1217            )
1218            .await
1219            .map_err(|e| AuthError::Failed(format!("Token generation failed: {e}")))?;
1220
1221        // Log token generation
1222        let audit_event = AuditEvent::new(
1223            AuditEventType::KeyUsed,
1224            AuditSeverity::Info,
1225            "jwt".to_string(),
1226            format!("JWT token pair generated for key {}", key.id),
1227        )
1228        .with_resource(key.id.clone())
1229        .with_client_ip(client_ip.unwrap_or_else(|| "unknown".to_string()));
1230
1231        let _ = self.audit_logger.log(audit_event).await;
1232
1233        Ok(token_pair)
1234    }
1235
1236    /// Validate a JWT token and return auth context
1237    pub async fn validate_jwt_token(&self, token: &str) -> Result<AuthContext, AuthError> {
1238        let auth_context = self
1239            .jwt_manager
1240            .token_to_auth_context(token)
1241            .await
1242            .map_err(|e| match e {
1243                crate::jwt::JwtError::Expired => AuthError::Failed("Token expired".to_string()),
1244                crate::jwt::JwtError::InvalidFormat => {
1245                    AuthError::Failed("Invalid token format".to_string())
1246                }
1247                _ => AuthError::Failed(format!("Token validation failed: {}", e)),
1248            })?;
1249
1250        // Log successful token validation
1251        let audit_event = AuditEvent::new(
1252            AuditEventType::AuthSuccess,
1253            AuditSeverity::Info,
1254            "jwt".to_string(),
1255            format!("JWT token validated for user {:?}", auth_context.user_id),
1256        );
1257
1258        if let Some(ref user_id) = auth_context.user_id {
1259            let audit_event = audit_event.with_actor(user_id.clone());
1260            let _ = self.audit_logger.log(audit_event).await;
1261        }
1262
1263        Ok(auth_context)
1264    }
1265
1266    /// Refresh an access token using a refresh token
1267    pub async fn refresh_jwt_token(
1268        &self,
1269        refresh_token: &str,
1270        client_ip: Option<String>,
1271        scope: Vec<String>,
1272    ) -> Result<String, AuthError> {
1273        // First validate the refresh token to get the key ID
1274        let token_info = self
1275            .jwt_manager
1276            .validate_token(refresh_token)
1277            .await
1278            .map_err(|e| AuthError::Failed(format!("Invalid refresh token: {}", e)))?;
1279
1280        // Get current roles from the associated API key
1281        let roles = if let Some(key_id) = &token_info.claims.key_id {
1282            let key = self
1283                .get_key(key_id)
1284                .await
1285                .ok_or_else(|| AuthError::Failed("Associated API key not found".to_string()))?;
1286
1287            if !key.is_valid() {
1288                return Err(AuthError::Failed(
1289                    "Associated API key is invalid or expired".to_string(),
1290                ));
1291            }
1292
1293            vec![key.role.clone()]
1294        } else {
1295            // Fallback to stored roles if no key ID
1296            token_info.claims.roles
1297        };
1298
1299        // Generate new access token
1300        let access_token = self
1301            .jwt_manager
1302            .refresh_access_token(refresh_token, roles, client_ip.clone(), scope)
1303            .await
1304            .map_err(|e| AuthError::Failed(format!("Token refresh failed: {}", e)))?;
1305
1306        // Log token refresh
1307        let audit_event = AuditEvent::new(
1308            AuditEventType::KeyUsed,
1309            AuditSeverity::Info,
1310            "jwt".to_string(),
1311            format!(
1312                "JWT access token refreshed for subject {}",
1313                token_info.claims.sub
1314            ),
1315        )
1316        .with_actor(token_info.claims.sub)
1317        .with_client_ip(client_ip.unwrap_or_else(|| "unknown".to_string()));
1318
1319        let _ = self.audit_logger.log(audit_event).await;
1320
1321        Ok(access_token)
1322    }
1323
1324    /// Revoke a JWT token
1325    pub async fn revoke_jwt_token(&self, token: &str) -> Result<(), AuthError> {
1326        self.jwt_manager
1327            .revoke_token(token)
1328            .await
1329            .map_err(|e| AuthError::Failed(format!("Token revocation failed: {}", e)))?;
1330
1331        // Log token revocation
1332        let audit_event = AuditEvent::new(
1333            AuditEventType::SecurityViolation,
1334            AuditSeverity::Warning,
1335            "jwt".to_string(),
1336            "JWT token revoked".to_string(),
1337        );
1338
1339        let _ = self.audit_logger.log(audit_event).await;
1340
1341        Ok(())
1342    }
1343
1344    /// Clean up expired tokens from blacklist
1345    pub async fn cleanup_jwt_blacklist(&self) -> Result<usize, AuthError> {
1346        let cleaned = self.jwt_manager.cleanup_blacklist().await;
1347
1348        if cleaned > 0 {
1349            let audit_event = AuditEvent::new(
1350                AuditEventType::SystemStartup,
1351                AuditSeverity::Info,
1352                "jwt".to_string(),
1353                format!("Cleaned up {} expired tokens from blacklist", cleaned),
1354            );
1355
1356            let _ = self.audit_logger.log(audit_event).await;
1357        }
1358
1359        Ok(cleaned)
1360    }
1361
1362    /// Get token info without validation (for debugging)
1363    pub fn decode_jwt_token_info(&self, token: &str) -> Result<crate::jwt::TokenClaims, AuthError> {
1364        self.jwt_manager
1365            .decode_token_info(token)
1366            .map_err(|e| AuthError::Failed(format!("Token decoding failed: {}", e)))
1367    }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372    use super::*;
1373    use crate::config::{AuthConfig, StorageConfig};
1374    use crate::models::Role;
1375    use tokio;
1376
1377    fn create_test_config() -> AuthConfig {
1378        AuthConfig {
1379            storage: StorageConfig::Memory,
1380            enabled: true,
1381            cache_size: 100,
1382            session_timeout_secs: 3600,
1383            max_failed_attempts: 3,
1384            rate_limit_window_secs: 300,
1385        }
1386    }
1387
1388    #[allow(dead_code)]
1389    fn create_test_validation_config() -> ValidationConfig {
1390        ValidationConfig {
1391            max_failed_attempts: 3,
1392            failed_attempt_window_minutes: 15,
1393            block_duration_minutes: 30,
1394            session_timeout_minutes: 60,
1395            strict_ip_validation: false,
1396            enable_role_based_rate_limiting: false,
1397            role_rate_limits: HashMap::new(),
1398        }
1399    }
1400
1401    #[tokio::test]
1402    async fn test_auth_manager_creation() {
1403        let config = create_test_config();
1404
1405        let result = AuthenticationManager::new(config).await;
1406        assert!(result.is_ok());
1407    }
1408
1409    #[tokio::test]
1410    async fn test_create_api_key() {
1411        let config = create_test_config();
1412        let manager = AuthenticationManager::new(config).await.unwrap();
1413
1414        let result = manager
1415            .create_api_key("Test Key".to_string(), Role::Monitor, None, None)
1416            .await;
1417        assert!(result.is_ok());
1418
1419        let key = result.unwrap();
1420        assert_eq!(key.name, "Test Key");
1421        assert!(key.id.starts_with("lmcp_"));
1422        assert_eq!(key.role, Role::Monitor);
1423    }
1424}