auth_framework/server/security/
caep_continuous_access.rs

1//! # Continuous Access Evaluation Protocol (CAEP)
2//!
3//! This module implements the Continuous Access Evaluation Protocol (CAEP), enabling
4//! real-time access evaluation and revocation based on security events and risk changes.
5//!
6//! ## Overview
7//!
8//! CAEP extends traditional OAuth 2.0 and OpenID Connect by providing continuous
9//! monitoring and evaluation of access tokens, allowing for immediate revocation
10//! when security conditions change.
11//!
12//! ## Key Features
13//!
14//! - **Real-time Event Processing**: Continuous monitoring of security events
15//! - **Automatic Access Revocation**: Immediate token revocation on security events
16//! - **Cross-system Event Propagation**: Events can trigger actions across multiple systems
17//! - **Risk-based Evaluation**: Dynamic access decisions based on changing risk profiles
18//! - **Session State Management**: Continuous session validity assessment
19//!
20//! ## Event Types
21//!
22//! - **User Events**: Login/logout, profile changes, credential changes
23//! - **Session Events**: Session creation, modification, timeout, suspicious activity
24//! - **Risk Events**: Location changes, device changes, behavioral anomalies
25//! - **Policy Events**: Access policy updates, compliance violations
26//! - **System Events**: Service outages, security incidents
27//!
28//! ## Usage Example
29//!
30//! ```rust,no_run
31//! use auth_framework::server::caep_continuous_access::*;
32//! use auth_framework::server::{SessionManager, oidc_backchannel_logout::BackChannelLogoutManager};
33//! use chrono::Duration;
34//! use std::sync::Arc;
35//! use async_trait::async_trait;
36//!
37//! // Example event handler implementation
38//! struct RiskScoreHandler;
39//!
40//! #[async_trait]
41//! impl CaepEventHandler for RiskScoreHandler {
42//!     async fn handle_event(&self, event: &CaepEvent) -> auth_framework::errors::Result<()> {
43//!         if event.risk_score > 0.8 {
44//!             // High risk - would revoke access in real implementation
45//!             println!("High risk detected: {}", event.risk_score);
46//!         }
47//!         Ok(())
48//!     }
49//!
50//!     fn supported_event_types(&self) -> Vec<CaepEventType> {
51//!         vec![CaepEventType::RiskScoreChange]
52//!     }
53//! }
54//!
55//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
56//! // Initialize CAEP manager (simplified example - in real use, get managers from DI container)
57//! let config = CaepConfig {
58//!     event_stream_url: "wss://events.example.com/caep".to_string(),
59//!     evaluation_interval: Duration::from_std(std::time::Duration::from_secs(30))?,
60//!     auto_revoke: true,
61//!     ..Default::default()
62//! };
63//!
64//! // In real code, create these with proper configuration from your DI container
65//! # let session_config = Default::default();
66//! # let session_manager = Arc::new(SessionManager::new(session_config));
67//! # let logout_config = Default::default();
68//! # let logout_manager = Arc::new(BackChannelLogoutManager::new(logout_config, session_manager.as_ref().clone())?);
69//! # let mut caep_manager = CaepManager::new(config, session_manager, logout_manager).await?;
70//!
71//! // Register event handler
72//! caep_manager.register_event_handler(
73//!     CaepEventType::RiskScoreChange,
74//!     Arc::new(RiskScoreHandler)
75//! ).await?;
76//!
77//! // Start continuous evaluation
78//! caep_manager.start_continuous_evaluation().await?;
79//! # Ok(())
80//! # }
81//! ```
82
83use crate::errors::{AuthError, Result};
84use crate::server::core::stepped_up_auth::SteppedUpAuthManager;
85use crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutManager;
86use crate::server::oidc::oidc_session_management::SessionManager;
87
88use async_trait::async_trait;
89use chrono::{DateTime, Duration, Timelike, Utc};
90use serde::{Deserialize, Serialize};
91use std::collections::HashMap;
92use std::sync::Arc;
93use tokio::sync::{RwLock, broadcast};
94use tokio::time::{Interval, interval};
95use uuid::Uuid;
96
97/// Type alias for complex event handler storage
98type EventHandlerMap = Arc<RwLock<HashMap<CaepEventType, Vec<Arc<dyn CaepEventHandler>>>>>;
99
100/// Configuration for Continuous Access Evaluation Protocol
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct CaepConfig {
103    /// URL for the event stream endpoint
104    pub event_stream_url: String,
105
106    /// How frequently to evaluate access decisions (default: 30 seconds)
107    pub evaluation_interval: Duration,
108
109    /// Whether to automatically revoke access on high-risk events
110    pub auto_revoke: bool,
111
112    /// Minimum risk score threshold for automatic revocation (0.0-1.0)
113    pub auto_revoke_threshold: f32,
114
115    /// Maximum number of concurrent event processors
116    pub max_concurrent_processors: usize,
117
118    /// Event retention period for audit trails
119    pub event_retention_period: Duration,
120
121    /// Cross-system event propagation endpoints
122    pub propagation_endpoints: Vec<String>,
123
124    /// Custom evaluation rules
125    pub evaluation_rules: Vec<CaepEvaluationRule>,
126}
127
128impl Default for CaepConfig {
129    fn default() -> Self {
130        Self {
131            event_stream_url: "wss://localhost:8080/caep/events".to_string(),
132            evaluation_interval: Duration::try_seconds(30).unwrap_or(Duration::zero()),
133            auto_revoke: true,
134            auto_revoke_threshold: 0.8,
135            max_concurrent_processors: 10,
136            event_retention_period: Duration::try_hours(24).unwrap_or(Duration::zero()),
137            propagation_endpoints: Vec::new(),
138            evaluation_rules: Vec::new(),
139        }
140    }
141}
142
143/// Types of CAEP events
144#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
145#[serde(rename_all = "snake_case")]
146pub enum CaepEventType {
147    /// User authentication events
148    UserLogin,
149    UserLogout,
150    UserProfileChange,
151    UserCredentialChange,
152
153    /// Session-related events
154    SessionCreated,
155    SessionModified,
156    SessionTimeout,
157    SessionSuspiciousActivity,
158
159    /// Risk assessment events
160    RiskScoreChange,
161    LocationChange,
162    DeviceChange,
163    BehavioralAnomaly,
164
165    /// Policy and compliance events
166    PolicyUpdate,
167    ComplianceViolation,
168    AccessPatternAnomaly,
169
170    /// System and security events
171    SystemOutage,
172    SecurityIncident,
173    DataBreach,
174
175    /// Custom events for extensibility
176    Custom(String),
177}
178
179/// Severity levels for CAEP events
180#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
181#[serde(rename_all = "snake_case")]
182pub enum CaepEventSeverity {
183    Low,
184    Medium,
185    High,
186    Critical,
187}
188
189/// Source of a CAEP event
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct CaepEventSource {
192    /// Identifier of the source system
193    pub system_id: String,
194
195    /// Type of source (e.g., "identity_provider", "risk_engine", "policy_engine")
196    pub source_type: String,
197
198    /// Version of the source system
199    pub version: Option<String>,
200
201    /// Additional source metadata
202    pub metadata: HashMap<String, serde_json::Value>,
203}
204
205/// A CAEP security event
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct CaepEvent {
208    /// Unique event identifier
209    pub id: Uuid,
210
211    /// Type of event
212    pub event_type: CaepEventType,
213
214    /// Subject (user) associated with the event
215    pub subject: String,
216
217    /// Severity level of the event
218    pub severity: CaepEventSeverity,
219
220    /// When the event occurred
221    pub timestamp: DateTime<Utc>,
222
223    /// Source of the event
224    pub source: CaepEventSource,
225
226    /// Current risk score (0.0-1.0)
227    pub risk_score: f32,
228
229    /// Session ID if applicable
230    pub session_id: Option<String>,
231
232    /// Geographic location information
233    pub location: Option<CaepLocationInfo>,
234
235    /// Device information
236    pub device_info: Option<CaepDeviceInfo>,
237
238    /// Event-specific data
239    pub event_data: serde_json::Value,
240
241    /// Correlation ID for related events
242    pub correlation_id: Option<Uuid>,
243}
244
245/// Geographic location information for CAEP events
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct CaepLocationInfo {
248    /// Country code (ISO 3166-1 alpha-2)
249    pub country: Option<String>,
250
251    /// Region or state
252    pub region: Option<String>,
253
254    /// City name
255    pub city: Option<String>,
256
257    /// Latitude coordinate
258    pub latitude: Option<f64>,
259
260    /// Longitude coordinate
261    pub longitude: Option<f64>,
262
263    /// IP address
264    pub ip_address: Option<String>,
265
266    /// Whether location is considered suspicious
267    pub is_suspicious: bool,
268}
269
270/// Device information for CAEP events
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct CaepDeviceInfo {
273    /// Device identifier
274    pub device_id: Option<String>,
275
276    /// Device type (mobile, desktop, tablet, etc.)
277    pub device_type: Option<String>,
278
279    /// Operating system
280    pub os: Option<String>,
281
282    /// Browser or client application
283    pub client: Option<String>,
284
285    /// Whether device is trusted
286    pub is_trusted: bool,
287
288    /// Whether device binding is required
289    pub requires_binding: bool,
290}
291
292/// Evaluation rule for continuous access decisions
293#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct CaepEvaluationRule {
295    /// Rule identifier
296    pub id: String,
297
298    /// Human-readable description
299    pub description: String,
300
301    /// Event types this rule applies to
302    pub applicable_events: Vec<CaepEventType>,
303
304    /// Conditions that must be met
305    pub conditions: Vec<CaepRuleCondition>,
306
307    /// Actions to take when rule is triggered
308    pub actions: Vec<CaepRuleAction>,
309
310    /// Priority of this rule (higher numbers = higher priority)
311    pub priority: i32,
312
313    /// Whether rule is currently enabled
314    pub enabled: bool,
315}
316
317/// Condition for a CAEP evaluation rule
318#[derive(Debug, Clone, Serialize, Deserialize)]
319#[serde(tag = "type", rename_all = "snake_case")]
320pub enum CaepRuleCondition {
321    /// Risk score threshold condition
322    RiskScoreAbove { threshold: f32 },
323
324    /// Event severity condition
325    SeverityAtLeast { severity: CaepEventSeverity },
326
327    /// Location-based condition
328    LocationChange { suspicious_only: bool },
329
330    /// Device-based condition
331    UnknownDevice { require_trusted: bool },
332
333    /// Time-based condition
334    OutsideBusinessHours { timezone: String },
335
336    /// Custom condition with expression
337    Custom { expression: String },
338}
339
340/// Action to take when a CAEP rule is triggered
341#[derive(Debug, Clone, Serialize, Deserialize)]
342#[serde(tag = "type", rename_all = "snake_case")]
343pub enum CaepRuleAction {
344    /// Revoke access tokens for the subject
345    RevokeAccess { immediate: bool },
346
347    /// Require step-up authentication
348    RequireStepUp { level: String },
349
350    /// Send notification
351    SendNotification { channels: Vec<String> },
352
353    /// Log security event
354    LogEvent { level: String },
355
356    /// Trigger external webhook
357    TriggerWebhook { url: String },
358
359    /// Quarantine session
360    QuarantineSession { duration_minutes: u32 },
361}
362
363/// Result of a continuous access evaluation
364#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct CaepEvaluationResult {
366    /// Subject being evaluated
367    pub subject: String,
368
369    /// Current access decision
370    pub access_decision: CaepAccessDecision,
371
372    /// Current risk score
373    pub risk_score: f32,
374
375    /// Triggered rules
376    pub triggered_rules: Vec<String>,
377
378    /// Required actions
379    pub required_actions: Vec<CaepRuleAction>,
380
381    /// Evaluation timestamp
382    pub evaluated_at: DateTime<Utc>,
383
384    /// Next evaluation time
385    pub next_evaluation: DateTime<Utc>,
386}
387
388/// Access decision from CAEP evaluation
389#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
390#[serde(rename_all = "snake_case")]
391pub enum CaepAccessDecision {
392    /// Access granted - continue as normal
393    Allow,
394
395    /// Access granted but requires monitoring
396    AllowWithMonitoring,
397
398    /// Access granted but requires step-up authentication
399    AllowWithStepUp,
400
401    /// Access temporarily denied - retry later
402    TemporaryDeny,
403
404    /// Access permanently denied - revoke tokens
405    Deny,
406}
407
408/// State of a CAEP session
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct CaepSessionState {
411    /// Session identifier
412    pub session_id: String,
413
414    /// Subject (user) of the session
415    pub subject: String,
416
417    /// Current risk score
418    pub risk_score: f32,
419
420    /// Last evaluation result
421    pub last_evaluation: Option<CaepEvaluationResult>,
422
423    /// Active events for this session
424    pub active_events: Vec<CaepEvent>,
425
426    /// Session creation time
427    pub created_at: DateTime<Utc>,
428
429    /// Last activity time
430    pub last_activity: DateTime<Utc>,
431
432    /// Whether session is quarantined
433    pub is_quarantined: bool,
434
435    /// Quarantine end time if applicable
436    pub quarantine_until: Option<DateTime<Utc>>,
437}
438
439/// Comprehensive session information combining OIDC and CAEP data
440#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct ComprehensiveSessionInfo {
442    /// OIDC session information
443    pub oidc_session: crate::server::oidc::oidc_session_management::OidcSession,
444
445    /// CAEP session information if available
446    pub caep_session: Option<CaepSessionState>,
447
448    /// Whether this session is being monitored by CAEP
449    pub is_monitored_by_caep: bool,
450}
451
452/// Event handler trait for CAEP events
453#[async_trait]
454pub trait CaepEventHandler: Send + Sync {
455    /// Handle a CAEP event
456    async fn handle_event(&self, event: &CaepEvent) -> Result<()>;
457
458    /// Get event types this handler can process
459    fn supported_event_types(&self) -> Vec<CaepEventType>;
460}
461
462/// Main CAEP manager for continuous access evaluation
463pub struct CaepManager {
464    /// Configuration
465    config: CaepConfig,
466
467    /// Session manager integration
468    session_manager: Arc<SessionManager>,
469
470    /// Logout manager for revocations
471    logout_manager: Arc<BackChannelLogoutManager>,
472
473    /// Step-up authentication manager
474    step_up_manager: Option<Arc<SteppedUpAuthManager>>,
475
476    /// Active sessions being monitored
477    sessions: Arc<RwLock<HashMap<String, CaepSessionState>>>,
478
479    /// Event handlers by type
480    event_handlers: EventHandlerMap,
481
482    /// Event stream broadcaster
483    event_broadcaster: broadcast::Sender<CaepEvent>,
484
485    /// Evaluation timer
486    evaluation_interval: Interval,
487
488    /// Event history for audit trails
489    event_history: Arc<RwLock<Vec<CaepEvent>>>,
490
491    /// Evaluation rules
492    rules: Arc<RwLock<Vec<CaepEvaluationRule>>>,
493}
494
495impl CaepManager {
496    /// Create a new CAEP manager
497    pub async fn new(
498        config: CaepConfig,
499        session_manager: Arc<SessionManager>,
500        logout_manager: Arc<BackChannelLogoutManager>,
501    ) -> Result<Self> {
502        let (event_broadcaster, _) = broadcast::channel(1000);
503        let evaluation_interval = interval(config.evaluation_interval.to_std().map_err(|e| {
504            AuthError::Configuration {
505                message: format!("Invalid evaluation interval: {}", e),
506                help: Some("Provide a valid duration for evaluation interval".to_string()),
507                docs_url: Some("https://docs.auth-framework.com/configuration".to_string()),
508                source: None,
509                suggested_fix: Some("Check your configuration and ensure the evaluation interval is properly formatted".to_string()),
510            }
511        })?);
512
513        Ok(Self {
514            config: config.clone(),
515            session_manager,
516            logout_manager,
517            step_up_manager: None,
518            sessions: Arc::new(RwLock::new(HashMap::new())),
519            event_handlers: Arc::new(RwLock::new(HashMap::new())),
520            event_broadcaster,
521            evaluation_interval,
522            event_history: Arc::new(RwLock::new(Vec::new())),
523            rules: Arc::new(RwLock::new(config.evaluation_rules)),
524        })
525    }
526
527    /// Set step-up authentication manager
528    pub fn with_step_up_manager(mut self, step_up_manager: Arc<SteppedUpAuthManager>) -> Self {
529        self.step_up_manager = Some(step_up_manager);
530        self
531    }
532
533    /// Register an event handler
534    pub async fn register_event_handler(
535        &self,
536        event_type: CaepEventType,
537        handler: Arc<dyn CaepEventHandler>,
538    ) -> Result<()> {
539        let mut handlers = self.event_handlers.write().await;
540        handlers.entry(event_type).or_default().push(handler);
541        Ok(())
542    }
543
544    /// Process a CAEP event
545    pub async fn process_event(&self, event: CaepEvent) -> Result<CaepEvaluationResult> {
546        // Add event to history
547        {
548            let mut history = self.event_history.write().await;
549            history.push(event.clone());
550
551            // Cleanup old events
552            let retention_cutoff = Utc::now() - self.config.event_retention_period;
553            history.retain(|e| e.timestamp >= retention_cutoff);
554        }
555
556        // Broadcast event
557        if let Err(e) = self.event_broadcaster.send(event.clone()) {
558            log::warn!("Failed to broadcast CAEP event: {}", e);
559        }
560
561        // Update session state
562        if let Some(session_id) = &event.session_id {
563            self.update_session_state(session_id, &event).await?;
564        }
565
566        // Evaluate access decision
567        let evaluation_result = self.evaluate_access(&event.subject, Some(&event)).await?;
568
569        // Execute required actions
570        self.execute_actions(&evaluation_result).await?;
571
572        // Notify registered handlers
573        self.notify_handlers(&event).await?;
574
575        Ok(evaluation_result)
576    }
577
578    /// Evaluate continuous access for a subject
579    pub async fn evaluate_access(
580        &self,
581        subject: &str,
582        triggering_event: Option<&CaepEvent>,
583    ) -> Result<CaepEvaluationResult> {
584        let rules = self.rules.read().await;
585        let mut triggered_rules = Vec::new();
586        let mut required_actions = Vec::new();
587        let risk_score = if let Some(event) = triggering_event {
588            event.risk_score
589        } else {
590            // Calculate risk from recent events
591            self.calculate_risk_score(subject).await?
592        };
593
594        // Apply evaluation rules
595        for rule in rules.iter() {
596            if !rule.enabled {
597                continue;
598            }
599
600            if let Some(event) = triggering_event
601                && !rule.applicable_events.contains(&event.event_type)
602            {
603                continue;
604            }
605
606            if self
607                .evaluate_rule_conditions(rule, subject, triggering_event, risk_score)
608                .await?
609            {
610                triggered_rules.push(rule.id.clone());
611                required_actions.extend(rule.actions.clone());
612            }
613        }
614
615        // Determine access decision
616        let access_decision = self.determine_access_decision(risk_score, &required_actions);
617
618        let now = Utc::now();
619        Ok(CaepEvaluationResult {
620            subject: subject.to_string(),
621            access_decision,
622            risk_score,
623            triggered_rules,
624            required_actions,
625            evaluated_at: now,
626            next_evaluation: now + self.config.evaluation_interval,
627        })
628    }
629
630    /// Start continuous evaluation loop
631    pub async fn start_continuous_evaluation(&mut self) -> Result<()> {
632        loop {
633            self.evaluation_interval.tick().await;
634
635            // First, synchronize with SessionManager to clean up stale sessions
636            self.synchronize_with_session_manager().await?;
637
638            // Evaluate all active sessions
639            let sessions = {
640                let sessions_guard = self.sessions.read().await;
641                sessions_guard.keys().cloned().collect::<Vec<_>>()
642            };
643
644            for session_id in sessions {
645                if let Some(session_state) = self.sessions.read().await.get(&session_id) {
646                    let evaluation = self.evaluate_access(&session_state.subject, None).await?;
647                    self.execute_actions(&evaluation).await?;
648                }
649            }
650        }
651    }
652
653    /// Synchronize CAEP sessions with SessionManager
654    async fn synchronize_with_session_manager(&self) -> Result<()> {
655        let mut sessions = self.sessions.write().await;
656        let mut sessions_to_remove = Vec::new();
657
658        for (session_id, caep_session) in sessions.iter() {
659            // Check if session still exists and is valid in SessionManager
660            if let Some(oidc_session) = self.session_manager.get_session(session_id) {
661                if !self.session_manager.is_session_valid(session_id) {
662                    log::info!("CAEP removing expired session: {}", session_id);
663                    sessions_to_remove.push(session_id.clone());
664                }
665                // Verify subject consistency
666                else if oidc_session.sub != caep_session.subject {
667                    log::warn!("CAEP session subject mismatch, removing: {}", session_id);
668                    sessions_to_remove.push(session_id.clone());
669                }
670            } else {
671                log::info!("CAEP removing orphaned session: {}", session_id);
672                sessions_to_remove.push(session_id.clone());
673            }
674        }
675
676        // Remove stale sessions
677        for session_id in sessions_to_remove {
678            sessions.remove(&session_id);
679        }
680
681        Ok(())
682    }
683
684    /// Revoke access for a subject
685    pub async fn revoke_subject_access(&self, subject: &str) -> Result<()> {
686        log::info!("CAEP revoking access for subject: {}", subject);
687
688        // Find all sessions for this subject and initiate back-channel logout
689        let sessions_to_logout = {
690            let sessions = self.sessions.read().await;
691            sessions
692                .iter()
693                .filter(|(_, session)| session.subject == subject)
694                .map(|(session_id, session)| (session_id.clone(), session.clone()))
695                .collect::<Vec<_>>()
696        };
697
698        // Process back-channel logout for each session
699        for (session_id, _) in &sessions_to_logout {
700            // Use the BackChannelLogoutManager to perform proper logout
701            let logout_request =
702                crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest {
703                    session_id: session_id.clone(),
704                    sub: subject.to_string(),
705                    sid: Some(session_id.clone()),
706                    iss: "caep-manager".to_string(), // In production, use actual issuer
707                    initiating_client_id: None,      // CAEP-initiated logout
708                    additional_events: Some({
709                        let mut events = HashMap::new();
710                        events.insert(
711                            "caep_reason".to_string(),
712                            serde_json::json!("automatic_revocation"),
713                        );
714                        events.insert(
715                            "timestamp".to_string(),
716                            serde_json::json!(Utc::now().timestamp()),
717                        );
718                        events
719                    }),
720                };
721
722            // Process the logout through the BackChannelLogoutManager
723            // Use async approach to handle logout manager integration
724            match self.process_backchannel_logout(&logout_request).await {
725                Ok(_) => {
726                    log::info!(
727                        "Successfully initiated back-channel logout for session {} (subject: {})",
728                        session_id,
729                        subject
730                    );
731                }
732                Err(e) => {
733                    log::error!(
734                        "Failed to initiate back-channel logout for session {} (subject: {}): {}",
735                        session_id,
736                        subject,
737                        e
738                    );
739                }
740            }
741        }
742
743        // Remove from CAEP active sessions after logout processing
744        let mut sessions = self.sessions.write().await;
745        sessions.retain(|_, session| session.subject != subject);
746
747        Ok(())
748    }
749
750    /// Process back-channel logout through the logout manager
751    async fn process_backchannel_logout(
752        &self,
753        logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
754    ) -> Result<()> {
755        // Use the logout manager to process the logout request
756        // This integrates CAEP with the existing logout infrastructure
757
758        // Use the logout manager to get metadata and validate capabilities
759        let logout_metadata = self.logout_manager.get_discovery_metadata();
760        log::info!("Logout manager capabilities: {:?}", logout_metadata);
761
762        // Create CAEP-specific logout token based on the result
763        let logout_token = self
764            .create_logout_token_for_caep_revocation(logout_request)
765            .await?;
766
767        // Handle CAEP-specific logout processing
768        self.handle_caep_logout(logout_request, &logout_token)
769            .await?;
770
771        log::info!("CAEP backchannel logout processed successfully");
772        Ok(())
773    }
774
775    /// Create logout token for CAEP-initiated revocation
776    async fn create_logout_token_for_caep_revocation(
777        &self,
778        logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
779    ) -> Result<String> {
780        use jsonwebtoken::{Algorithm, EncodingKey, Header, encode};
781        use serde_json::json;
782
783        // Create CAEP-specific logout token claims
784        let claims = json!({
785            "iss": logout_request.iss,
786            "sub": logout_request.sub,
787            "aud": ["caep-manager"],
788            "exp": (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp(),
789            "iat": chrono::Utc::now().timestamp(),
790            "jti": uuid::Uuid::new_v4().to_string(),
791            "sid": logout_request.sid,
792            "events": {
793                "http://schemas.openid.net/secevent/caep/event-type/session-revoked": {}
794            },
795            "caep_reason": logout_request.additional_events
796                .as_ref()
797                .and_then(|events| events.get("caep_reason"))
798                .unwrap_or(&serde_json::json!("automatic_revocation"))
799        });
800
801        // In production, use proper signing key
802        let key = EncodingKey::from_secret("caep-secret".as_ref());
803        let header = Header::new(Algorithm::HS256);
804
805        let token = encode(&header, &claims, &key).map_err(|e| {
806            AuthError::auth_method("caep", format!("Failed to create logout token: {}", e))
807        })?;
808
809        Ok(token)
810    }
811
812    /// Handle CAEP logout processing
813    async fn handle_caep_logout(
814        &self,
815        logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
816        logout_token: &str,
817    ) -> Result<()> {
818        // This method integrates with the logout manager's functionality
819        // In a production system, this would:
820        // 1. Validate the logout token
821        // 2. Notify all relevant clients about the session termination
822        // 3. Update session state in persistent storage
823        // 4. Trigger any cleanup procedures
824
825        log::info!(
826            "Processing CAEP logout for session: {}",
827            logout_request.session_id
828        );
829
830        // Update CAEP session state to reflect logout
831        {
832            let mut sessions = self.sessions.write().await;
833            if let Some(_session) = sessions.get(&logout_request.session_id) {
834                // Remove the session entirely as it's being terminated
835                sessions.remove(&logout_request.session_id);
836            }
837        }
838
839        // Emit CAEP event for the logout
840        let caep_event = CaepEvent {
841            id: uuid::Uuid::new_v4(),
842            event_type: CaepEventType::UserLogout, // Use existing event type
843            subject: logout_request.sub.clone(),
844            session_id: Some(logout_request.session_id.clone()),
845            timestamp: chrono::Utc::now(),
846            severity: CaepEventSeverity::High,
847            source: CaepEventSource {
848                system_id: "caep-manager".to_string(),
849                source_type: "caep_automatic_revocation".to_string(),
850                version: Some("1.0".to_string()),
851                metadata: std::collections::HashMap::new(),
852            },
853            risk_score: 1.0, // High risk score for revoked sessions
854            location: None,
855            device_info: None,
856            event_data: serde_json::json!({
857                "logout_token": logout_token,
858                "initiator": "caep_automatic_revocation",
859                "reason": logout_request.additional_events
860                    .as_ref()
861                    .and_then(|events| events.get("caep_reason"))
862                    .cloned()
863                    .unwrap_or_else(|| serde_json::json!("automatic_revocation"))
864            }),
865            correlation_id: Some(uuid::Uuid::new_v4()),
866        };
867
868        // Broadcast the event
869        if let Err(e) = self.event_broadcaster.send(caep_event) {
870            log::warn!("Failed to broadcast CAEP logout event: {}", e);
871        }
872
873        log::info!(
874            "CAEP logout completed for session: {}",
875            logout_request.session_id
876        );
877        Ok(())
878    }
879
880    /// Calculate risk score for a subject based on recent events
881    async fn calculate_risk_score(&self, subject: &str) -> Result<f32> {
882        let history = self.event_history.read().await;
883        let recent_cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
884
885        let recent_events: Vec<_> = history
886            .iter()
887            .filter(|e| e.subject == subject && e.timestamp >= recent_cutoff)
888            .collect();
889
890        if recent_events.is_empty() {
891            return Ok(0.0);
892        }
893
894        // Calculate weighted risk score
895        let mut total_risk = 0.0;
896        let mut total_weight = 0.0;
897
898        for event in recent_events {
899            let weight = match event.severity {
900                CaepEventSeverity::Low => 1.0,
901                CaepEventSeverity::Medium => 2.0,
902                CaepEventSeverity::High => 4.0,
903                CaepEventSeverity::Critical => 8.0,
904            };
905
906            total_risk += event.risk_score * weight;
907            total_weight += weight;
908        }
909
910        Ok(if total_weight > 0.0 {
911            (total_risk / total_weight).min(1.0)
912        } else {
913            0.0
914        })
915    }
916
917    /// Update session state based on an event
918    async fn update_session_state(&self, session_id: &str, event: &CaepEvent) -> Result<()> {
919        // First, validate the session exists in the SessionManager
920        if let Some(oidc_session) = self.session_manager.get_session(session_id) {
921            // Verify the session is still valid
922            if !self.session_manager.is_session_valid(session_id) {
923                log::warn!(
924                    "CAEP received event for expired OIDC session: {}",
925                    session_id
926                );
927                // Remove from CAEP sessions as well
928                let mut sessions = self.sessions.write().await;
929                sessions.remove(session_id);
930                return Ok(());
931            }
932
933            // Ensure subjects match
934            if oidc_session.sub != event.subject {
935                return Err(AuthError::validation(
936                    "Subject mismatch between CAEP event and OIDC session",
937                ));
938            }
939        } else {
940            log::warn!(
941                "CAEP received event for unknown OIDC session: {}",
942                session_id
943            );
944            return Err(AuthError::validation("Session not found in SessionManager"));
945        }
946
947        // Update CAEP-specific session state
948        let mut sessions = self.sessions.write().await;
949
950        let session_state =
951            sessions
952                .entry(session_id.to_string())
953                .or_insert_with(|| CaepSessionState {
954                    session_id: session_id.to_string(),
955                    subject: event.subject.clone(),
956                    risk_score: event.risk_score,
957                    last_evaluation: None,
958                    active_events: Vec::new(),
959                    created_at: Utc::now(),
960                    last_activity: Utc::now(),
961                    is_quarantined: false,
962                    quarantine_until: None,
963                });
964
965        session_state.risk_score = event.risk_score;
966        session_state.last_activity = Utc::now();
967        session_state.active_events.push(event.clone());
968
969        // Remove old events
970        let cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
971        session_state
972            .active_events
973            .retain(|e| e.timestamp >= cutoff);
974
975        Ok(())
976    }
977
978    /// Evaluate rule conditions
979    async fn evaluate_rule_conditions(
980        &self,
981        rule: &CaepEvaluationRule,
982        _subject: &str,
983        event: Option<&CaepEvent>,
984        risk_score: f32,
985    ) -> Result<bool> {
986        for condition in &rule.conditions {
987            match condition {
988                CaepRuleCondition::RiskScoreAbove { threshold } => {
989                    if risk_score <= *threshold {
990                        return Ok(false);
991                    }
992                }
993                CaepRuleCondition::SeverityAtLeast { severity } => {
994                    if let Some(event) = event {
995                        let event_severity_level = match event.severity {
996                            CaepEventSeverity::Critical => 4,
997                            CaepEventSeverity::High => 3,
998                            CaepEventSeverity::Medium => 2,
999                            CaepEventSeverity::Low => 1,
1000                        };
1001
1002                        let required_severity_level = match severity {
1003                            CaepEventSeverity::Critical => 4,
1004                            CaepEventSeverity::High => 3,
1005                            CaepEventSeverity::Medium => 2,
1006                            CaepEventSeverity::Low => 1,
1007                        };
1008
1009                        if event_severity_level < required_severity_level {
1010                            return Ok(false);
1011                        }
1012                    } else {
1013                        return Ok(false);
1014                    }
1015                }
1016                CaepRuleCondition::LocationChange { suspicious_only } => {
1017                    if let Some(event) = event {
1018                        if let Some(location) = &event.location {
1019                            if *suspicious_only && !location.is_suspicious {
1020                                return Ok(false);
1021                            }
1022                        } else {
1023                            return Ok(false);
1024                        }
1025                    } else {
1026                        return Ok(false);
1027                    }
1028                }
1029                CaepRuleCondition::UnknownDevice { require_trusted } => {
1030                    if let Some(event) = event
1031                        && let Some(device) = &event.device_info
1032                        && *require_trusted
1033                        && device.is_trusted
1034                    {
1035                        return Ok(false);
1036                    }
1037                }
1038                CaepRuleCondition::OutsideBusinessHours { timezone: _ } => {
1039                    // Simplified: assume business hours are 9 AM - 5 PM UTC
1040                    let hour = Utc::now().hour();
1041                    if (9..17).contains(&hour) {
1042                        return Ok(false);
1043                    }
1044                }
1045                CaepRuleCondition::Custom { expression: _ } => {
1046                    // Custom expressions would need a proper expression evaluator
1047                    // For now, always evaluate to true
1048                }
1049            }
1050        }
1051
1052        Ok(true)
1053    }
1054
1055    /// Determine access decision based on risk and actions
1056    fn determine_access_decision(
1057        &self,
1058        risk_score: f32,
1059        actions: &[CaepRuleAction],
1060    ) -> CaepAccessDecision {
1061        for action in actions {
1062            match action {
1063                CaepRuleAction::RevokeAccess { immediate: true } => {
1064                    return CaepAccessDecision::Deny;
1065                }
1066                CaepRuleAction::RevokeAccess { immediate: false } => {
1067                    return CaepAccessDecision::TemporaryDeny;
1068                }
1069                CaepRuleAction::RequireStepUp { .. } => {
1070                    return CaepAccessDecision::AllowWithStepUp;
1071                }
1072                CaepRuleAction::QuarantineSession { .. } => {
1073                    return CaepAccessDecision::TemporaryDeny;
1074                }
1075                _ => {}
1076            }
1077        }
1078
1079        if risk_score >= self.config.auto_revoke_threshold {
1080            CaepAccessDecision::Deny
1081        } else if risk_score >= 0.6 {
1082            CaepAccessDecision::AllowWithMonitoring
1083        } else {
1084            CaepAccessDecision::Allow
1085        }
1086    }
1087
1088    /// Execute required actions from evaluation
1089    async fn execute_actions(&self, evaluation: &CaepEvaluationResult) -> Result<()> {
1090        for action in &evaluation.required_actions {
1091            match action {
1092                CaepRuleAction::RevokeAccess { .. } => {
1093                    self.revoke_subject_access(&evaluation.subject).await?;
1094                }
1095                CaepRuleAction::RequireStepUp { level } => {
1096                    if let Some(_step_up_manager) = &self.step_up_manager {
1097                        // Trigger step-up authentication
1098                        log::info!(
1099                            "CAEP requiring step-up to level {} for subject {}",
1100                            level,
1101                            evaluation.subject
1102                        );
1103                    }
1104                }
1105                CaepRuleAction::SendNotification { channels } => {
1106                    log::info!(
1107                        "CAEP sending notification via channels {:?} for subject {}",
1108                        channels,
1109                        evaluation.subject
1110                    );
1111                }
1112                CaepRuleAction::LogEvent { level } => {
1113                    log::info!(
1114                        "CAEP logging event at level {} for subject {}",
1115                        level,
1116                        evaluation.subject
1117                    );
1118                }
1119                CaepRuleAction::TriggerWebhook { url } => {
1120                    log::info!(
1121                        "CAEP triggering webhook {} for subject {}",
1122                        url,
1123                        evaluation.subject
1124                    );
1125                }
1126                CaepRuleAction::QuarantineSession { duration_minutes } => {
1127                    self.quarantine_session(&evaluation.subject, *duration_minutes)
1128                        .await?;
1129                }
1130            }
1131        }
1132
1133        Ok(())
1134    }
1135
1136    /// Quarantine a session
1137    async fn quarantine_session(&self, subject: &str, duration_minutes: u32) -> Result<()> {
1138        let mut sessions = self.sessions.write().await;
1139        let quarantine_until =
1140            Utc::now() + Duration::try_minutes(duration_minutes as i64).unwrap_or(Duration::zero());
1141
1142        // Update CAEP session state
1143        let mut quarantined_session_ids = Vec::new();
1144        for session in sessions.values_mut() {
1145            if session.subject == subject {
1146                session.is_quarantined = true;
1147                session.quarantine_until = Some(quarantine_until);
1148                quarantined_session_ids.push(session.session_id.clone());
1149            }
1150        }
1151
1152        log::info!(
1153            "CAEP quarantined {} sessions for subject {} until {}. Session IDs: {:?}",
1154            quarantined_session_ids.len(),
1155            subject,
1156            quarantine_until,
1157            quarantined_session_ids
1158        );
1159
1160        // In a production implementation, you might want to notify the SessionManager
1161        // about the quarantine status through a separate event or notification system
1162
1163        Ok(())
1164    }
1165
1166    /// Notify registered event handlers
1167    async fn notify_handlers(&self, event: &CaepEvent) -> Result<()> {
1168        let handlers = self.event_handlers.read().await;
1169
1170        if let Some(event_handlers) = handlers.get(&event.event_type) {
1171            for handler in event_handlers {
1172                if let Err(e) = handler.handle_event(event).await {
1173                    log::error!("CAEP event handler failed: {}", e);
1174                }
1175            }
1176        }
1177
1178        Ok(())
1179    }
1180
1181    /// Get current session state
1182    pub async fn get_session_state(&self, session_id: &str) -> Result<Option<CaepSessionState>> {
1183        // First validate with SessionManager
1184        if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1185            if !self.session_manager.is_session_valid(session_id) {
1186                // Session is expired in SessionManager, remove from CAEP as well
1187                let mut sessions = self.sessions.write().await;
1188                sessions.remove(session_id);
1189                return Ok(None);
1190            }
1191
1192            // Return CAEP session state if it exists and subjects match
1193            let sessions = self.sessions.read().await;
1194            if let Some(caep_session) = sessions.get(session_id) {
1195                if caep_session.subject == oidc_session.sub {
1196                    return Ok(Some(caep_session.clone()));
1197                } else {
1198                    log::warn!(
1199                        "Subject mismatch between CAEP and OIDC sessions for {}",
1200                        session_id
1201                    );
1202                    return Ok(None);
1203                }
1204            }
1205        }
1206
1207        // No valid OIDC session found
1208        Ok(None)
1209    }
1210
1211    /// Get event history for a subject
1212    pub async fn get_event_history(
1213        &self,
1214        subject: &str,
1215        limit: Option<usize>,
1216    ) -> Result<Vec<CaepEvent>> {
1217        let history = self.event_history.read().await;
1218        let mut events: Vec<_> = history
1219            .iter()
1220            .filter(|e| e.subject == subject)
1221            .cloned()
1222            .collect();
1223
1224        events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1225
1226        if let Some(limit) = limit {
1227            events.truncate(limit);
1228        }
1229
1230        Ok(events)
1231    }
1232
1233    /// Add or update an evaluation rule
1234    pub async fn add_evaluation_rule(&self, rule: CaepEvaluationRule) -> Result<()> {
1235        let mut rules = self.rules.write().await;
1236
1237        // Remove existing rule with same ID
1238        rules.retain(|r| r.id != rule.id);
1239
1240        // Insert new rule and sort by priority
1241        rules.push(rule);
1242        rules.sort_by(|a, b| b.priority.cmp(&a.priority));
1243
1244        Ok(())
1245    }
1246
1247    /// Remove an evaluation rule
1248    pub async fn remove_evaluation_rule(&self, rule_id: &str) -> Result<bool> {
1249        let mut rules = self.rules.write().await;
1250        let original_len = rules.len();
1251        rules.retain(|r| r.id != rule_id);
1252        Ok(rules.len() < original_len)
1253    }
1254
1255    /// Get comprehensive session information combining OIDC and CAEP data
1256    pub async fn get_comprehensive_session_info(
1257        &self,
1258        session_id: &str,
1259    ) -> Result<Option<ComprehensiveSessionInfo>> {
1260        // Get OIDC session information
1261        if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1262            if !self.session_manager.is_session_valid(session_id) {
1263                return Ok(None);
1264            }
1265
1266            // Get CAEP session information if available
1267            let caep_session = {
1268                let sessions = self.sessions.read().await;
1269                sessions.get(session_id).cloned()
1270            };
1271
1272            let comprehensive_info = ComprehensiveSessionInfo {
1273                oidc_session: oidc_session.clone(),
1274                is_monitored_by_caep: caep_session.is_some(),
1275                caep_session,
1276            };
1277
1278            Ok(Some(comprehensive_info))
1279        } else {
1280            Ok(None)
1281        }
1282    }
1283
1284    /// Get all sessions for a subject with comprehensive information
1285    pub async fn get_subject_sessions(
1286        &self,
1287        subject: &str,
1288    ) -> Result<Vec<ComprehensiveSessionInfo>> {
1289        let oidc_sessions = self.session_manager.get_sessions_for_subject(subject);
1290        let mut comprehensive_sessions = Vec::new();
1291
1292        for oidc_session in oidc_sessions {
1293            if self
1294                .session_manager
1295                .is_session_valid(&oidc_session.session_id)
1296            {
1297                let caep_session = {
1298                    let sessions = self.sessions.read().await;
1299                    sessions.get(&oidc_session.session_id).cloned()
1300                };
1301
1302                comprehensive_sessions.push(ComprehensiveSessionInfo {
1303                    oidc_session: oidc_session.clone(),
1304                    is_monitored_by_caep: caep_session.is_some(),
1305                    caep_session,
1306                });
1307            }
1308        }
1309
1310        Ok(comprehensive_sessions)
1311    }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316    use super::*;
1317    use tokio;
1318
1319    #[tokio::test]
1320    async fn test_caep_event_creation() {
1321        let event = CaepEvent {
1322            id: Uuid::new_v4(),
1323            event_type: CaepEventType::RiskScoreChange,
1324            subject: "user123".to_string(),
1325            severity: CaepEventSeverity::High,
1326            timestamp: Utc::now(),
1327            source: CaepEventSource {
1328                system_id: "risk_engine".to_string(),
1329                source_type: "ml_model".to_string(),
1330                version: Some("1.0.0".to_string()),
1331                metadata: HashMap::new(),
1332            },
1333            risk_score: 0.85,
1334            session_id: Some("session123".to_string()),
1335            location: None,
1336            device_info: None,
1337            event_data: serde_json::json!({
1338                "previous_score": 0.3,
1339                "new_score": 0.85,
1340                "trigger": "suspicious_login_pattern"
1341            }),
1342            correlation_id: None,
1343        };
1344
1345        assert_eq!(event.subject, "user123");
1346        assert_eq!(event.risk_score, 0.85);
1347        assert!(matches!(event.severity, CaepEventSeverity::High));
1348    }
1349
1350    #[tokio::test]
1351    async fn test_caep_config_creation() {
1352        let config = CaepConfig::default();
1353        assert!(!config.event_stream_url.is_empty());
1354        assert!(config.auto_revoke);
1355        assert_eq!(config.auto_revoke_threshold, 0.8);
1356    }
1357
1358    #[tokio::test]
1359    async fn test_severity_comparison() {
1360        // Test severity level comparison logic
1361        let high_level = match CaepEventSeverity::High {
1362            CaepEventSeverity::Critical => 4,
1363            CaepEventSeverity::High => 3,
1364            CaepEventSeverity::Medium => 2,
1365            CaepEventSeverity::Low => 1,
1366        };
1367
1368        let medium_level = match CaepEventSeverity::Medium {
1369            CaepEventSeverity::Critical => 4,
1370            CaepEventSeverity::High => 3,
1371            CaepEventSeverity::Medium => 2,
1372            CaepEventSeverity::Low => 1,
1373        };
1374
1375        assert!(high_level > medium_level);
1376    }
1377}