1use crate::{
4 audit::{AuditConfig, AuditEvent, AuditEventType, AuditLogger, AuditSeverity, events},
5 config::AuthConfig,
6 jwt::{JwtConfig, JwtManager, TokenPair},
7 models::*,
8 storage::{StorageBackend, create_storage_backend},
9};
10use chrono::{DateTime, Utc};
11use pulseengine_mcp_protocol::{Request, Response};
12use std::collections::HashMap;
13use std::sync::Arc;
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tracing::{debug, error, info, warn};
17
18#[derive(Debug, Clone)]
20pub struct RequestContext {
21 pub user_id: Option<String>,
22 pub roles: Vec<Role>,
23}
24
25#[derive(Debug, Error, serde::Serialize)]
26pub enum AuthError {
27 #[error("Authentication failed: {0}")]
28 Failed(String),
29
30 #[error("Configuration error: {0}")]
31 Config(String),
32
33 #[error("Storage error: {0}")]
34 Storage(String),
35
36 #[error("Validation error: {0}")]
37 Validation(String),
38}
39
40pub struct AuthenticationManager {
42 config: AuthConfig,
43 validation_config: ValidationConfig,
45 storage: Arc<dyn StorageBackend>,
47 api_keys_cache: Arc<RwLock<std::collections::HashMap<String, ApiKey>>>,
49 rate_limit_state: Arc<RwLock<HashMap<String, RateLimitState>>>,
51 role_rate_limit_state: Arc<RwLock<HashMap<String, HashMap<String, RoleRateLimitStats>>>>,
53 audit_logger: Arc<AuditLogger>,
55 jwt_manager: Arc<JwtManager>,
57}
58
59#[derive(Debug, Clone)]
61pub struct RateLimitState {
62 pub failed_attempts: u32,
64 pub window_start: DateTime<Utc>,
66 pub blocked_until: Option<DateTime<Utc>>,
68 pub successful_requests: u32,
70 pub success_window_start: DateTime<Utc>,
72}
73
74#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
76pub struct RoleRateLimitConfig {
77 pub max_requests_per_window: u32,
79 pub window_duration_minutes: u64,
81 pub burst_allowance: u32,
83 pub cooldown_duration_minutes: u64,
85}
86
87#[derive(Debug, Clone)]
89pub struct ValidationConfig {
90 pub max_failed_attempts: u32,
92 pub failed_attempt_window_minutes: u64,
94 pub block_duration_minutes: u64,
96 pub session_timeout_minutes: u64,
98 pub strict_ip_validation: bool,
100 pub enable_role_based_rate_limiting: bool,
102 pub role_rate_limits: std::collections::HashMap<String, RoleRateLimitConfig>,
104}
105
106impl Default for ValidationConfig {
107 fn default() -> Self {
108 let mut role_rate_limits = std::collections::HashMap::new();
109
110 role_rate_limits.insert(
112 "admin".to_string(),
113 RoleRateLimitConfig {
114 max_requests_per_window: 1000,
115 window_duration_minutes: 60,
116 burst_allowance: 100,
117 cooldown_duration_minutes: 5,
118 },
119 );
120
121 role_rate_limits.insert(
122 "operator".to_string(),
123 RoleRateLimitConfig {
124 max_requests_per_window: 500,
125 window_duration_minutes: 60,
126 burst_allowance: 50,
127 cooldown_duration_minutes: 10,
128 },
129 );
130
131 role_rate_limits.insert(
132 "monitor".to_string(),
133 RoleRateLimitConfig {
134 max_requests_per_window: 200,
135 window_duration_minutes: 60,
136 burst_allowance: 20,
137 cooldown_duration_minutes: 15,
138 },
139 );
140
141 role_rate_limits.insert(
142 "device".to_string(),
143 RoleRateLimitConfig {
144 max_requests_per_window: 100,
145 window_duration_minutes: 60,
146 burst_allowance: 10,
147 cooldown_duration_minutes: 20,
148 },
149 );
150
151 role_rate_limits.insert(
152 "custom".to_string(),
153 RoleRateLimitConfig {
154 max_requests_per_window: 50,
155 window_duration_minutes: 60,
156 burst_allowance: 5,
157 cooldown_duration_minutes: 30,
158 },
159 );
160
161 Self {
162 max_failed_attempts: 4,
163 failed_attempt_window_minutes: 15,
164 block_duration_minutes: 30,
165 session_timeout_minutes: 480, strict_ip_validation: true,
167 enable_role_based_rate_limiting: true,
168 role_rate_limits,
169 }
170 }
171}
172
173#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
175pub struct RateLimitStats {
176 pub total_tracked_ips: usize,
178 pub currently_blocked_ips: u32,
180 pub total_failed_attempts: u64,
182 pub role_stats: std::collections::HashMap<String, RoleRateLimitStats>,
184}
185
186#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
188pub struct RoleRateLimitStats {
189 pub current_requests: u32,
191 pub blocked_requests: u64,
193 pub total_requests: u64,
195 pub in_cooldown: bool,
197 pub cooldown_ends_at: Option<DateTime<Utc>>,
199 pub last_window_start: Option<DateTime<Utc>>,
201}
202
203impl AuthenticationManager {
204 pub async fn new(config: AuthConfig) -> Result<Self, AuthError> {
205 let storage = create_storage_backend(&config.storage)
207 .await
208 .map_err(|e| AuthError::Storage(e.to_string()))?;
209
210 let audit_config = AuditConfig::default();
212 let audit_logger =
213 Arc::new(AuditLogger::new(audit_config).await.map_err(|e| {
214 AuthError::Config(format!("Failed to initialize audit logger: {}", e))
215 })?);
216
217 let jwt_config = JwtConfig::default();
219 let jwt_manager =
220 Arc::new(JwtManager::new(jwt_config).map_err(|e| {
221 AuthError::Config(format!("Failed to initialize JWT manager: {}", e))
222 })?);
223
224 let manager = Self {
225 storage,
226 validation_config: ValidationConfig::default(),
227 api_keys_cache: Arc::new(RwLock::new(HashMap::new())),
228 rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
229 role_rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
230 audit_logger,
231 jwt_manager,
232 config,
233 };
234
235 manager.refresh_cache().await?;
237
238 let startup_event = AuditEvent::new(
240 AuditEventType::SystemStartup,
241 AuditSeverity::Info,
242 "auth_manager".to_string(),
243 "Authentication manager initialized successfully".to_string(),
244 );
245 let _ = manager.audit_logger.log(startup_event).await;
246
247 info!("Authentication manager initialized successfully");
248 Ok(manager)
249 }
250
251 pub async fn new_with_validation(
252 config: AuthConfig,
253 validation_config: ValidationConfig,
254 ) -> Result<Self, AuthError> {
255 let storage = create_storage_backend(&config.storage)
257 .await
258 .map_err(|e| AuthError::Storage(e.to_string()))?;
259
260 let audit_config = AuditConfig::default();
262 let audit_logger =
263 Arc::new(AuditLogger::new(audit_config).await.map_err(|e| {
264 AuthError::Config(format!("Failed to initialize audit logger: {}", e))
265 })?);
266
267 let jwt_config = JwtConfig::default();
269 let jwt_manager =
270 Arc::new(JwtManager::new(jwt_config).map_err(|e| {
271 AuthError::Config(format!("Failed to initialize JWT manager: {}", e))
272 })?);
273
274 let manager = Self {
275 storage,
276 validation_config,
277 api_keys_cache: Arc::new(RwLock::new(HashMap::new())),
278 rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
279 role_rate_limit_state: Arc::new(RwLock::new(HashMap::new())),
280 audit_logger,
281 jwt_manager,
282 config,
283 };
284
285 manager.refresh_cache().await?;
287
288 let startup_event = AuditEvent::new(
290 AuditEventType::SystemStartup,
291 AuditSeverity::Info,
292 "auth_manager".to_string(),
293 "Authentication manager initialized with custom validation config".to_string(),
294 );
295 let _ = manager.audit_logger.log(startup_event).await;
296
297 info!("Authentication manager initialized with custom validation config");
298 Ok(manager)
299 }
300
301 pub async fn create_api_key(
303 &self,
304 name: String,
305 role: Role,
306 expires_at: Option<DateTime<Utc>>,
307 ip_whitelist: Option<Vec<String>>,
308 ) -> Result<ApiKey, AuthError> {
309 let key = ApiKey::new(name, role, expires_at, ip_whitelist.unwrap_or_default());
310
311 self.storage
313 .save_key(&key)
314 .await
315 .map_err(|e| AuthError::Storage(e.to_string()))?;
316
317 {
319 let mut cache = self.api_keys_cache.write().await;
320 cache.insert(key.id.clone(), key.clone());
321 }
322
323 let audit_event = events::key_created(&key.id, "system", &key.role.to_string());
325 let _ = self.audit_logger.log(audit_event).await;
326
327 info!("Created new API key: {} ({})", key.id, key.name);
328 Ok(key)
329 }
330
331 pub async fn validate_api_key(
333 &self,
334 key_secret: &str,
335 client_ip: Option<&str>,
336 ) -> Result<Option<AuthContext>, AuthError> {
337 let client_ip = client_ip.unwrap_or("unknown");
338
339 if let Some(blocked_until) = self.check_rate_limit(client_ip).await {
341 let audit_event = AuditEvent::new(
343 AuditEventType::AuthRateLimited,
344 AuditSeverity::Warning,
345 "rate_limiter".to_string(),
346 format!(
347 "IP {} blocked due to rate limiting until {}",
348 client_ip,
349 blocked_until.format("%Y-%m-%d %H:%M:%S UTC")
350 ),
351 )
352 .with_client_ip(client_ip.to_string());
353 let _ = self.audit_logger.log(audit_event).await;
354
355 return Err(AuthError::Failed(format!(
356 "IP {} is rate limited until {}",
357 client_ip,
358 blocked_until.format("%Y-%m-%d %H:%M:%S UTC")
359 )));
360 }
361
362 let key = {
363 let cache = self.api_keys_cache.read().await;
364
365 cache
367 .values()
368 .find(|key| {
369 key.verify_key(key_secret).unwrap_or_default()
371 })
372 .cloned()
373 };
374
375 let key = match key {
376 Some(key) => key,
377 None => {
378 self.record_failed_attempt(client_ip).await;
379
380 let audit_event = events::auth_failure(client_ip, "Invalid API key");
382 let _ = self.audit_logger.log(audit_event).await;
383
384 return Err(AuthError::Failed("Invalid API key".to_string()));
385 }
386 };
387
388 if let Err(reason) = self.validate_key_security(&key, client_ip) {
390 self.record_failed_attempt(client_ip).await;
391
392 let audit_event = events::auth_failure(client_ip, &reason);
394 let _ = self.audit_logger.log(audit_event).await;
395
396 return Err(AuthError::Failed(reason));
397 }
398
399 if let Ok(is_rate_limited) = self.check_role_rate_limit(&key.role, client_ip).await {
401 if is_rate_limited {
402 self.record_failed_attempt(client_ip).await;
403
404 let audit_event = events::auth_failure(
406 client_ip,
407 &format!(
408 "Role-based rate limit exceeded for role {}",
409 self.get_role_key(&key.role)
410 ),
411 );
412 let _ = self.audit_logger.log(audit_event).await;
413
414 return Err(AuthError::Failed(format!(
415 "Rate limit exceeded for role {}",
416 self.get_role_key(&key.role)
417 )));
418 }
419 }
420
421 let mut updated_key = key.clone();
423
424 self.clear_failed_attempts(client_ip).await;
425
426 updated_key.mark_used();
428
429 if let Err(e) = self.storage.save_key(&updated_key).await {
431 warn!("Failed to update key usage statistics: {}", e);
432 } else {
433 let mut cache = self.api_keys_cache.write().await;
434 cache.insert(updated_key.id.clone(), updated_key.clone());
435 }
436
437 let auth_event = events::auth_success(&key.id, client_ip);
439 let _ = self.audit_logger.log(auth_event).await;
440
441 let key_usage_event = events::key_used(&key.id, client_ip);
442 let _ = self.audit_logger.log(key_usage_event).await;
443
444 Ok(Some(AuthContext {
446 user_id: Some(key.id.clone()),
447 roles: vec![key.role.clone()],
448 api_key_id: Some(key.id.clone()),
449 permissions: self.get_permissions_for_role(&key.role),
450 }))
451 }
452
453 pub async fn validate_api_key_legacy(
455 &self,
456 key_secret: &str,
457 ) -> Result<Option<AuthContext>, AuthError> {
458 self.validate_api_key(key_secret, None).await
459 }
460
461 pub async fn list_keys(&self) -> Vec<ApiKey> {
463 let cache = self.api_keys_cache.read().await;
464 cache.values().cloned().collect()
465 }
466
467 pub async fn get_key(&self, key_id: &str) -> Option<ApiKey> {
469 let cache = self.api_keys_cache.read().await;
470 cache.get(key_id).cloned()
471 }
472
473 pub async fn update_key(&self, key: ApiKey) -> Result<(), AuthError> {
475 self.storage
477 .save_key(&key)
478 .await
479 .map_err(|e| AuthError::Storage(e.to_string()))?;
480
481 {
483 let mut cache = self.api_keys_cache.write().await;
484 cache.insert(key.id.clone(), key.clone());
485 }
486
487 debug!("Updated API key: {}", key.id);
488 Ok(())
489 }
490
491 pub async fn revoke_key(&self, key_id: &str) -> Result<bool, AuthError> {
493 self.storage
495 .delete_key(key_id)
496 .await
497 .map_err(|e| AuthError::Storage(e.to_string()))?;
498
499 let removed = {
501 let mut cache = self.api_keys_cache.write().await;
502 cache.remove(key_id).is_some()
503 };
504
505 if removed {
506 info!("Revoked API key: {}", key_id);
507 } else {
508 warn!("Attempted to revoke non-existent key: {}", key_id);
509 }
510
511 Ok(removed)
512 }
513
514 async fn check_rate_limit(&self, client_ip: &str) -> Option<DateTime<Utc>> {
516 let rate_limits = self.rate_limit_state.read().await;
517
518 if let Some(state) = rate_limits.get(client_ip) {
519 if let Some(blocked_until) = state.blocked_until {
520 if Utc::now() < blocked_until {
521 return Some(blocked_until);
522 }
523 }
524 }
525
526 None
527 }
528
529 async fn record_failed_attempt(&self, client_ip: &str) {
531 let mut rate_limits = self.rate_limit_state.write().await;
532 let now = Utc::now();
533
534 let state = rate_limits
535 .entry(client_ip.to_string())
536 .or_insert_with(|| RateLimitState {
537 failed_attempts: 0,
538 window_start: now,
539 blocked_until: None,
540 successful_requests: 0,
541 success_window_start: now,
542 });
543
544 let window_duration =
546 chrono::Duration::minutes(self.validation_config.failed_attempt_window_minutes as i64);
547 if now - state.window_start > window_duration {
548 state.failed_attempts = 1;
550 state.window_start = now;
551 state.blocked_until = None;
552 } else {
553 state.failed_attempts += 1;
555
556 if state.failed_attempts >= self.validation_config.max_failed_attempts {
558 let block_duration =
559 chrono::Duration::minutes(self.validation_config.block_duration_minutes as i64);
560 state.blocked_until = Some(now + block_duration);
561
562 warn!(
563 "IP {} blocked for {} minutes after {} failed attempts",
564 client_ip, self.validation_config.block_duration_minutes, state.failed_attempts
565 );
566 }
567 }
568
569 debug!(
570 "Failed attempt #{} from IP {} (window started: {})",
571 state.failed_attempts, client_ip, state.window_start
572 );
573 }
574
575 async fn clear_failed_attempts(&self, client_ip: &str) {
577 let mut rate_limits = self.rate_limit_state.write().await;
578 if rate_limits.remove(client_ip).is_some() {
579 debug!("Cleared failed attempts for IP: {}", client_ip);
580 }
581 }
582
583 fn validate_key_security(&self, key: &ApiKey, client_ip: &str) -> Result<(), String> {
585 if !key.active {
587 return Err("API key is disabled".to_string());
588 }
589
590 if let Some(expires_at) = key.expires_at {
592 if Utc::now() > expires_at {
593 return Err("API key has expired".to_string());
594 }
595 }
596
597 if self.validation_config.strict_ip_validation && !key.ip_whitelist.is_empty() {
599 let is_ip_allowed = key.ip_whitelist.iter().any(|allowed_ip| {
600 allowed_ip == client_ip || allowed_ip == "*"
602 });
603
604 if !is_ip_allowed {
605 return Err(format!("IP address {client_ip} not allowed for this key"));
606 }
607 }
608
609 Ok(())
610 }
611
612 fn get_permissions_for_role(&self, role: &Role) -> Vec<String> {
614 match role {
615 Role::Admin => vec![
616 "admin.*".to_string(),
617 "device.*".to_string(),
618 "system.*".to_string(),
619 "mcp.*".to_string(),
620 ],
621 Role::Operator => vec![
622 "device.*".to_string(),
623 "system.status".to_string(),
624 "mcp.tools.*".to_string(),
625 "mcp.resources.read".to_string(),
626 ],
627 Role::Monitor => vec![
628 "device.read".to_string(),
629 "system.status".to_string(),
630 "mcp.resources.read".to_string(),
631 ],
632 Role::Device { allowed_devices } => allowed_devices
633 .iter()
634 .map(|device| format!("device.{device}"))
635 .collect(),
636 Role::Custom { permissions } => permissions.clone(),
637 }
638 }
639
640 pub async fn get_rate_limit_stats(&self) -> RateLimitStats {
642 let rate_limits = self.rate_limit_state.read().await;
643 let role_states = self.role_rate_limit_state.read().await;
644 let now = Utc::now();
645
646 let mut stats = RateLimitStats {
647 total_tracked_ips: rate_limits.len(),
648 currently_blocked_ips: 0,
649 total_failed_attempts: 0,
650 role_stats: std::collections::HashMap::new(),
651 };
652
653 for state in rate_limits.values() {
654 stats.total_failed_attempts += state.failed_attempts as u64;
655
656 if let Some(blocked_until) = state.blocked_until {
657 if now < blocked_until {
658 stats.currently_blocked_ips += 1;
659 }
660 }
661 }
662
663 for (role_key, ip_states) in role_states.iter() {
665 let mut role_statistics = RoleRateLimitStats {
666 current_requests: 0,
667 blocked_requests: 0,
668 total_requests: 0,
669 in_cooldown: false,
670 cooldown_ends_at: None,
671 last_window_start: None,
672 };
673
674 for state in ip_states.values() {
675 role_statistics.current_requests += state.current_requests;
676 role_statistics.blocked_requests += state.blocked_requests;
677 role_statistics.total_requests += state.total_requests;
678
679 if let Some(cooldown_end) = state.cooldown_ends_at {
681 if now < cooldown_end {
682 role_statistics.in_cooldown = true;
683 if role_statistics.cooldown_ends_at.is_none()
684 || cooldown_end > role_statistics.cooldown_ends_at.unwrap()
685 {
686 role_statistics.cooldown_ends_at = Some(cooldown_end);
687 }
688 }
689 }
690 }
691
692 stats.role_stats.insert(role_key.clone(), role_statistics);
693 }
694
695 stats
696 }
697
698 pub async fn cleanup_rate_limits(&self) {
700 let mut rate_limits = self.rate_limit_state.write().await;
701 let now = Utc::now();
702 let cleanup_threshold = chrono::Duration::hours(24); let initial_count = rate_limits.len();
705 rate_limits.retain(|_ip, state| {
706 if let Some(blocked_until) = state.blocked_until {
708 if now < blocked_until {
709 return true;
710 }
711 }
712
713 now - state.window_start < cleanup_threshold
715 });
716
717 let removed_count = initial_count - rate_limits.len();
718 if removed_count > 0 {
719 debug!("Cleaned up {} old rate limit entries", removed_count);
720 }
721 }
722
723 pub async fn check_role_rate_limit(
727 &self,
728 role: &Role,
729 client_ip: &str,
730 ) -> Result<bool, AuthError> {
731 if !self.validation_config.enable_role_based_rate_limiting {
732 return Ok(false); }
734
735 let role_key = self.get_role_key(role);
736 let role_config = match self.validation_config.role_rate_limits.get(&role_key) {
737 Some(config) => config.clone(),
738 None => {
739 warn!(
741 "No rate limit config found for role '{}', using default",
742 role_key
743 );
744 return Ok(false);
745 }
746 };
747
748 let mut role_states = self.role_rate_limit_state.write().await;
749 let role_state_map = role_states
750 .entry(role_key.clone())
751 .or_insert_with(HashMap::new);
752
753 let now = Utc::now();
754 let state = role_state_map
755 .entry(client_ip.to_string())
756 .or_insert_with(|| RoleRateLimitStats {
757 current_requests: 0,
758 blocked_requests: 0,
759 total_requests: 0,
760 in_cooldown: false,
761 cooldown_ends_at: None,
762 last_window_start: None,
763 });
764
765 if let Some(cooldown_end) = state.cooldown_ends_at {
767 if now < cooldown_end {
768 state.blocked_requests += 1;
769
770 let audit_event = crate::audit::AuditEvent::new(
772 crate::audit::AuditEventType::AuthRateLimited,
773 crate::audit::AuditSeverity::Warning,
774 "role_rate_limiter".to_string(),
775 format!(
776 "Role {} from IP {} blocked (cooldown until {})",
777 role_key,
778 client_ip,
779 cooldown_end.format("%Y-%m-%d %H:%M:%S UTC")
780 ),
781 )
782 .with_client_ip(client_ip.to_string());
783 let _ = self.audit_logger.log(audit_event).await;
784
785 return Ok(true); }
787 state.in_cooldown = false;
789 state.cooldown_ends_at = None;
790 state.current_requests = 0;
791 }
792
793 let window_duration = chrono::Duration::minutes(role_config.window_duration_minutes as i64);
795
796 if let Some(last_window_start) = state.last_window_start {
798 if now.signed_duration_since(last_window_start) >= window_duration {
799 state.current_requests = 0;
800 state.last_window_start = Some(now);
801 }
802 } else {
803 state.last_window_start = Some(now);
804 }
805
806 state.current_requests += 1;
807 state.total_requests += 1;
808
809 let effective_limit = role_config.max_requests_per_window + role_config.burst_allowance;
811 if state.current_requests > effective_limit {
812 state.in_cooldown = true;
814 state.cooldown_ends_at =
815 Some(now + chrono::Duration::minutes(role_config.cooldown_duration_minutes as i64));
816 state.blocked_requests += 1;
817
818 let audit_event = crate::audit::AuditEvent::new(
820 crate::audit::AuditEventType::AuthRateLimited,
821 crate::audit::AuditSeverity::Warning,
822 "role_rate_limiter".to_string(),
823 format!(
824 "Role {} from IP {} rate limited for {} minutes after {} requests",
825 role_key,
826 client_ip,
827 role_config.cooldown_duration_minutes,
828 state.current_requests
829 ),
830 )
831 .with_client_ip(client_ip.to_string());
832 let _ = self.audit_logger.log(audit_event).await;
833
834 warn!(
835 "Role {} from IP {} rate limited for {} minutes after {} requests",
836 role_key, client_ip, role_config.cooldown_duration_minutes, state.current_requests
837 );
838
839 return Ok(true); }
841
842 if state.current_requests % 100 == 0 {
844 let audit_event = crate::audit::AuditEvent::new(
846 crate::audit::AuditEventType::AuthSuccess,
847 crate::audit::AuditSeverity::Info,
848 "role_rate_limiter".to_string(),
849 format!(
850 "Role {} from IP {} processed {} requests in window",
851 role_key, client_ip, state.current_requests
852 ),
853 )
854 .with_client_ip(client_ip.to_string());
855 let _ = self.audit_logger.log(audit_event).await;
856 }
857
858 Ok(false) }
860
861 fn get_role_key(&self, role: &Role) -> String {
863 match role {
864 Role::Admin => "admin".to_string(),
865 Role::Operator => "operator".to_string(),
866 Role::Monitor => "monitor".to_string(),
867 Role::Device { .. } => "device".to_string(),
868 Role::Custom { .. } => "custom".to_string(),
869 }
870 }
871
872 pub async fn update_role_rate_limit(
874 &self,
875 role_key: String,
876 config: RoleRateLimitConfig,
877 ) -> Result<(), AuthError> {
878 warn!(
881 "Role rate limit update requested for '{}' but configuration is immutable",
882 role_key
883 );
884
885 let audit_event = crate::audit::AuditEvent::new(
887 crate::audit::AuditEventType::SystemStartup,
888 crate::audit::AuditSeverity::Info,
889 "role_rate_limiter".to_string(),
890 format!(
891 "Rate limit configuration update requested for role '{}' (max_requests: {}, window: {} min)",
892 role_key, config.max_requests_per_window, config.window_duration_minutes
893 ),
894 );
895 let _ = self.audit_logger.log(audit_event).await;
896
897 Ok(())
898 }
899
900 pub async fn cleanup_role_rate_limits(&self) {
902 let mut role_states = self.role_rate_limit_state.write().await;
903 let now = Utc::now();
904 let cleanup_threshold = chrono::Duration::hours(24); let mut total_removed = 0;
907
908 for (_role_key, ip_states) in role_states.iter_mut() {
909 let initial_count = ip_states.len();
910 ip_states.retain(|_ip, state| {
911 if let Some(cooldown_end) = state.cooldown_ends_at {
913 if now < cooldown_end {
914 return true;
915 }
916 }
917
918 if let Some(window_start) = state.last_window_start {
920 if now.signed_duration_since(window_start) < cleanup_threshold {
921 return true;
922 }
923 }
924
925 false
927 });
928
929 let removed = initial_count - ip_states.len();
930 total_removed += removed;
931 }
932
933 role_states.retain(|_role, ip_states| !ip_states.is_empty());
935
936 if total_removed > 0 {
937 debug!("Cleaned up {} old role rate limit entries", total_removed);
938 }
939 }
940
941 async fn refresh_cache(&self) -> Result<(), AuthError> {
943 let keys = self
944 .storage
945 .load_keys()
946 .await
947 .map_err(|e| AuthError::Storage(e.to_string()))?;
948
949 let mut cache = self.api_keys_cache.write().await;
950 *cache = keys;
951
952 debug!("Refreshed cache with {} keys", cache.len());
953 Ok(())
954 }
955
956 pub async fn disable_key(&self, key_id: &str) -> Result<bool, AuthError> {
958 let mut key = match self.get_key(key_id).await {
959 Some(key) => key,
960 None => return Ok(false),
961 };
962
963 key.active = false;
964 self.update_key(key).await?;
965
966 info!("Disabled API key: {}", key_id);
967 Ok(true)
968 }
969
970 pub async fn enable_key(&self, key_id: &str) -> Result<bool, AuthError> {
972 let mut key = match self.get_key(key_id).await {
973 Some(key) => key,
974 None => return Ok(false),
975 };
976
977 key.active = true;
978 self.update_key(key).await?;
979
980 info!("Enabled API key: {}", key_id);
981 Ok(true)
982 }
983
984 pub async fn update_key_expiration(
986 &self,
987 key_id: &str,
988 expires_at: Option<DateTime<Utc>>,
989 ) -> Result<bool, AuthError> {
990 let mut key = match self.get_key(key_id).await {
991 Some(key) => key,
992 None => return Ok(false),
993 };
994
995 key.expires_at = expires_at;
996 self.update_key(key).await?;
997
998 info!("Updated expiration for API key: {}", key_id);
999 Ok(true)
1000 }
1001
1002 pub async fn update_key_ip_whitelist(
1004 &self,
1005 key_id: &str,
1006 ip_whitelist: Vec<String>,
1007 ) -> Result<bool, AuthError> {
1008 let mut key = match self.get_key(key_id).await {
1009 Some(key) => key,
1010 None => return Ok(false),
1011 };
1012
1013 key.ip_whitelist = ip_whitelist;
1014 self.update_key(key).await?;
1015
1016 info!("Updated IP whitelist for API key: {}", key_id);
1017 Ok(true)
1018 }
1019
1020 pub async fn list_keys_by_role(&self, role: &Role) -> Vec<ApiKey> {
1022 let cache = self.api_keys_cache.read().await;
1023 cache
1024 .values()
1025 .filter(|key| &key.role == role)
1026 .cloned()
1027 .collect()
1028 }
1029
1030 pub async fn list_active_keys(&self) -> Vec<ApiKey> {
1032 let cache = self.api_keys_cache.read().await;
1033 cache
1034 .values()
1035 .filter(|key| key.active && !key.is_expired())
1036 .cloned()
1037 .collect()
1038 }
1039
1040 pub async fn list_expired_keys(&self) -> Vec<ApiKey> {
1042 let cache = self.api_keys_cache.read().await;
1043 cache
1044 .values()
1045 .filter(|key| key.is_expired())
1046 .cloned()
1047 .collect()
1048 }
1049
1050 pub async fn bulk_revoke_keys(&self, key_ids: &[String]) -> Result<Vec<String>, AuthError> {
1052 let mut revoked = Vec::new();
1053
1054 for key_id in key_ids {
1055 match self.revoke_key(key_id).await {
1056 Ok(true) => revoked.push(key_id.clone()),
1057 Ok(false) => debug!("Key {} was already revoked or not found", key_id),
1058 Err(e) => error!("Failed to revoke key {}: {}", key_id, e),
1059 }
1060 }
1061
1062 info!("Bulk revoked {} keys", revoked.len());
1063 Ok(revoked)
1064 }
1065
1066 pub async fn cleanup_expired_keys(&self) -> Result<u32, AuthError> {
1068 let expired_keys = self.list_expired_keys().await;
1069 let key_ids: Vec<String> = expired_keys.iter().map(|k| k.id.clone()).collect();
1070
1071 let revoked = self.bulk_revoke_keys(&key_ids).await?;
1072
1073 info!("Cleaned up {} expired keys", revoked.len());
1074 Ok(revoked.len() as u32)
1075 }
1076
1077 pub async fn get_key_usage_stats(&self) -> Result<KeyUsageStats, AuthError> {
1079 let cache = self.api_keys_cache.read().await;
1080 let mut stats = KeyUsageStats::default();
1081
1082 for key in cache.values() {
1083 stats.total_keys += 1;
1084
1085 if key.active {
1086 stats.active_keys += 1;
1087 } else {
1088 stats.disabled_keys += 1;
1089 }
1090
1091 if key.is_expired() {
1092 stats.expired_keys += 1;
1093 }
1094
1095 stats.total_usage_count += key.usage_count;
1096
1097 match &key.role {
1099 Role::Admin => stats.admin_keys += 1,
1100 Role::Operator => stats.operator_keys += 1,
1101 Role::Monitor => stats.monitor_keys += 1,
1102 Role::Device { .. } => stats.device_keys += 1,
1103 Role::Custom { .. } => stats.custom_keys += 1,
1104 }
1105 }
1106
1107 Ok(stats)
1108 }
1109
1110 pub async fn bulk_create_keys(
1112 &self,
1113 requests: Vec<KeyCreationRequest>,
1114 ) -> Result<Vec<Result<ApiKey, AuthError>>, AuthError> {
1115 let mut results = Vec::new();
1116
1117 for request in requests {
1118 let result = self
1119 .create_api_key(
1120 request.name,
1121 request.role,
1122 request.expires_at,
1123 request.ip_whitelist,
1124 )
1125 .await;
1126 results.push(result);
1127 }
1128
1129 Ok(results)
1130 }
1131
1132 pub fn check_api_completeness(&self) -> ApiCompletenessCheck {
1134 ApiCompletenessCheck {
1135 has_create_key: true,
1136 has_validate_key: true,
1137 has_list_keys: true,
1138 has_revoke_key: true,
1139 has_update_key: true,
1140 has_bulk_operations: true,
1141 has_role_based_access: true,
1142 has_rate_limiting: true,
1143 has_ip_whitelisting: true,
1144 has_expiration_support: true,
1145 has_usage_tracking: true,
1146 framework_version: env!("CARGO_PKG_VERSION").to_string(),
1147 production_ready: true,
1148 }
1149 }
1150
1151 pub async fn start_background_tasks(&self) -> Result<(), AuthError> {
1152 Ok(())
1153 }
1154
1155 pub async fn stop_background_tasks(&self) -> Result<(), AuthError> {
1156 Ok(())
1157 }
1158
1159 pub async fn health_check(&self) -> Result<(), AuthError> {
1160 Ok(())
1161 }
1162
1163 pub async fn process_request(
1164 &self,
1165 request: Request,
1166 _context: &RequestContext,
1167 ) -> Result<Request, AuthError> {
1168 if !self.config.enabled {
1169 return Ok(request);
1170 }
1171
1172 Ok(request)
1174 }
1175
1176 pub async fn process_response(
1177 &self,
1178 response: Response,
1179 _context: &RequestContext,
1180 ) -> Result<Response, AuthError> {
1181 Ok(response)
1182 }
1183
1184 pub async fn generate_token_for_key(
1188 &self,
1189 key_id: &str,
1190 client_ip: Option<String>,
1191 session_id: Option<String>,
1192 scope: Vec<String>,
1193 ) -> Result<TokenPair, AuthError> {
1194 let key = self
1196 .get_key(key_id)
1197 .await
1198 .ok_or_else(|| AuthError::Failed("API key not found".to_string()))?;
1199
1200 if !key.is_valid() {
1202 return Err(AuthError::Failed(
1203 "API key is invalid or expired".to_string(),
1204 ));
1205 }
1206
1207 let token_pair = self
1209 .jwt_manager
1210 .generate_token_pair(
1211 key.id.clone(),
1212 vec![key.role.clone()],
1213 Some(key.id.clone()),
1214 client_ip.clone(),
1215 session_id.clone(),
1216 scope,
1217 )
1218 .await
1219 .map_err(|e| AuthError::Failed(format!("Token generation failed: {e}")))?;
1220
1221 let audit_event = AuditEvent::new(
1223 AuditEventType::KeyUsed,
1224 AuditSeverity::Info,
1225 "jwt".to_string(),
1226 format!("JWT token pair generated for key {}", key.id),
1227 )
1228 .with_resource(key.id.clone())
1229 .with_client_ip(client_ip.unwrap_or_else(|| "unknown".to_string()));
1230
1231 let _ = self.audit_logger.log(audit_event).await;
1232
1233 Ok(token_pair)
1234 }
1235
1236 pub async fn validate_jwt_token(&self, token: &str) -> Result<AuthContext, AuthError> {
1238 let auth_context = self
1239 .jwt_manager
1240 .token_to_auth_context(token)
1241 .await
1242 .map_err(|e| match e {
1243 crate::jwt::JwtError::Expired => AuthError::Failed("Token expired".to_string()),
1244 crate::jwt::JwtError::InvalidFormat => {
1245 AuthError::Failed("Invalid token format".to_string())
1246 }
1247 _ => AuthError::Failed(format!("Token validation failed: {}", e)),
1248 })?;
1249
1250 let audit_event = AuditEvent::new(
1252 AuditEventType::AuthSuccess,
1253 AuditSeverity::Info,
1254 "jwt".to_string(),
1255 format!("JWT token validated for user {:?}", auth_context.user_id),
1256 );
1257
1258 if let Some(ref user_id) = auth_context.user_id {
1259 let audit_event = audit_event.with_actor(user_id.clone());
1260 let _ = self.audit_logger.log(audit_event).await;
1261 }
1262
1263 Ok(auth_context)
1264 }
1265
1266 pub async fn refresh_jwt_token(
1268 &self,
1269 refresh_token: &str,
1270 client_ip: Option<String>,
1271 scope: Vec<String>,
1272 ) -> Result<String, AuthError> {
1273 let token_info = self
1275 .jwt_manager
1276 .validate_token(refresh_token)
1277 .await
1278 .map_err(|e| AuthError::Failed(format!("Invalid refresh token: {}", e)))?;
1279
1280 let roles = if let Some(key_id) = &token_info.claims.key_id {
1282 let key = self
1283 .get_key(key_id)
1284 .await
1285 .ok_or_else(|| AuthError::Failed("Associated API key not found".to_string()))?;
1286
1287 if !key.is_valid() {
1288 return Err(AuthError::Failed(
1289 "Associated API key is invalid or expired".to_string(),
1290 ));
1291 }
1292
1293 vec![key.role.clone()]
1294 } else {
1295 token_info.claims.roles
1297 };
1298
1299 let access_token = self
1301 .jwt_manager
1302 .refresh_access_token(refresh_token, roles, client_ip.clone(), scope)
1303 .await
1304 .map_err(|e| AuthError::Failed(format!("Token refresh failed: {}", e)))?;
1305
1306 let audit_event = AuditEvent::new(
1308 AuditEventType::KeyUsed,
1309 AuditSeverity::Info,
1310 "jwt".to_string(),
1311 format!(
1312 "JWT access token refreshed for subject {}",
1313 token_info.claims.sub
1314 ),
1315 )
1316 .with_actor(token_info.claims.sub)
1317 .with_client_ip(client_ip.unwrap_or_else(|| "unknown".to_string()));
1318
1319 let _ = self.audit_logger.log(audit_event).await;
1320
1321 Ok(access_token)
1322 }
1323
1324 pub async fn revoke_jwt_token(&self, token: &str) -> Result<(), AuthError> {
1326 self.jwt_manager
1327 .revoke_token(token)
1328 .await
1329 .map_err(|e| AuthError::Failed(format!("Token revocation failed: {}", e)))?;
1330
1331 let audit_event = AuditEvent::new(
1333 AuditEventType::SecurityViolation,
1334 AuditSeverity::Warning,
1335 "jwt".to_string(),
1336 "JWT token revoked".to_string(),
1337 );
1338
1339 let _ = self.audit_logger.log(audit_event).await;
1340
1341 Ok(())
1342 }
1343
1344 pub async fn cleanup_jwt_blacklist(&self) -> Result<usize, AuthError> {
1346 let cleaned = self.jwt_manager.cleanup_blacklist().await;
1347
1348 if cleaned > 0 {
1349 let audit_event = AuditEvent::new(
1350 AuditEventType::SystemStartup,
1351 AuditSeverity::Info,
1352 "jwt".to_string(),
1353 format!("Cleaned up {} expired tokens from blacklist", cleaned),
1354 );
1355
1356 let _ = self.audit_logger.log(audit_event).await;
1357 }
1358
1359 Ok(cleaned)
1360 }
1361
1362 pub fn decode_jwt_token_info(&self, token: &str) -> Result<crate::jwt::TokenClaims, AuthError> {
1364 self.jwt_manager
1365 .decode_token_info(token)
1366 .map_err(|e| AuthError::Failed(format!("Token decoding failed: {}", e)))
1367 }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372 use super::*;
1373 use crate::config::{AuthConfig, StorageConfig};
1374 use crate::models::Role;
1375 use tokio;
1376
1377 fn create_test_config() -> AuthConfig {
1378 AuthConfig {
1379 storage: StorageConfig::Memory,
1380 enabled: true,
1381 cache_size: 100,
1382 session_timeout_secs: 3600,
1383 max_failed_attempts: 3,
1384 rate_limit_window_secs: 300,
1385 }
1386 }
1387
1388 #[allow(dead_code)]
1389 fn create_test_validation_config() -> ValidationConfig {
1390 ValidationConfig {
1391 max_failed_attempts: 3,
1392 failed_attempt_window_minutes: 15,
1393 block_duration_minutes: 30,
1394 session_timeout_minutes: 60,
1395 strict_ip_validation: false,
1396 enable_role_based_rate_limiting: false,
1397 role_rate_limits: HashMap::new(),
1398 }
1399 }
1400
1401 #[tokio::test]
1402 async fn test_auth_manager_creation() {
1403 let config = create_test_config();
1404
1405 let result = AuthenticationManager::new(config).await;
1406 assert!(result.is_ok());
1407 }
1408
1409 #[tokio::test]
1410 async fn test_create_api_key() {
1411 let config = create_test_config();
1412 let manager = AuthenticationManager::new(config).await.unwrap();
1413
1414 let result = manager
1415 .create_api_key("Test Key".to_string(), Role::Monitor, None, None)
1416 .await;
1417 assert!(result.is_ok());
1418
1419 let key = result.unwrap();
1420 assert_eq!(key.name, "Test Key");
1421 assert!(key.id.starts_with("lmcp_"));
1422 assert_eq!(key.role, Role::Monitor);
1423 }
1424}