Skip to main content

auth_framework/server/security/
caep_continuous_access.rs

1//! # Continuous Access Evaluation Protocol (CAEP)
2//!
3//! This module implements the Continuous Access Evaluation Protocol (CAEP), enabling
4//! real-time access evaluation and revocation based on security events and risk changes.
5//!
6//! ## Overview
7//!
8//! CAEP extends traditional OAuth 2.0 and OpenID Connect by providing continuous
9//! monitoring and evaluation of access tokens, allowing for immediate revocation
10//! when security conditions change.
11//!
12//! ## Key Features
13//!
14//! - **Real-time Event Processing**: Continuous monitoring of security events
15//! - **Automatic Access Revocation**: Immediate token revocation on security events
16//! - **Cross-system Event Propagation**: Events can trigger actions across multiple systems
17//! - **Risk-based Evaluation**: Dynamic access decisions based on changing risk profiles
18//! - **Session State Management**: Continuous session validity assessment
19//!
20//! ## Event Types
21//!
22//! - **User Events**: Login/logout, profile changes, credential changes
23//! - **Session Events**: Session creation, modification, timeout, suspicious activity
24//! - **Risk Events**: Location changes, device changes, behavioral anomalies
25//! - **Policy Events**: Access policy updates, compliance violations
26//! - **System Events**: Service outages, security incidents
27//!
28//! ## Usage Example
29//!
30//! ```rust,no_run
31//! use auth_framework::server::security::caep_continuous_access::*;
32//! use auth_framework::server::{SessionManager, BackChannelLogoutManager};
33//! use chrono::Duration;
34//! use std::sync::Arc;
35//! use async_trait::async_trait;
36//!
37//! // Example event handler implementation
38//! struct RiskScoreHandler;
39//!
40//! #[async_trait]
41//! impl CaepEventHandler for RiskScoreHandler {
42//!     async fn handle_event(&self, event: &CaepEvent) -> auth_framework::errors::Result<()> {
43//!         if event.risk_score > 0.8 {
44//!             // High risk - would revoke access in real implementation
45//!             println!("High risk detected: {}", event.risk_score);
46//!         }
47//!         Ok(())
48//!     }
49//!
50//!     fn supported_event_types(&self) -> Vec<CaepEventType> {
51//!         vec![CaepEventType::RiskScoreChange]
52//!     }
53//! }
54//!
55//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
56//! // Initialize CAEP manager (simplified example - in real use, get managers from DI container)
57//! let config = CaepConfig {
58//!     event_stream_url: "wss://events.example.com/caep".to_string(),
59//!     evaluation_interval: Duration::from_std(std::time::Duration::from_secs(30))?,
60//!     auto_revoke: true,
61//!     ..Default::default()
62//! };
63//!
64//! // In real code, create these with proper configuration from your DI container
65//! # let session_config = Default::default();
66//! # let session_manager = Arc::new(SessionManager::new(session_config));
67//! # let logout_config = Default::default();
68//! # let logout_manager = Arc::new(BackChannelLogoutManager::new(logout_config, session_manager.as_ref().clone())?);
69//! # let mut caep_manager = CaepManager::new(config, session_manager, logout_manager).await?;
70//!
71//! // Register event handler
72//! caep_manager.register_event_handler(
73//!     CaepEventType::RiskScoreChange,
74//!     Arc::new(RiskScoreHandler)
75//! ).await?;
76//!
77//! // Start continuous evaluation
78//! caep_manager.start_continuous_evaluation().await?;
79//! # Ok(())
80//! # }
81//! ```
82
83use crate::errors::{AuthError, Result};
84use crate::server::core::stepped_up_auth::SteppedUpAuthManager;
85use crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutManager;
86use crate::server::oidc::oidc_session_management::SessionManager;
87
88use async_trait::async_trait;
89use chrono::{DateTime, Duration, Timelike, Utc};
90use serde::{Deserialize, Serialize};
91use std::collections::HashMap;
92use std::sync::Arc;
93use tokio::sync::{RwLock, broadcast};
94use tokio::time::{Interval, interval};
95use uuid::Uuid;
96
97/// Type alias for complex event handler storage
98type EventHandlerMap = Arc<RwLock<HashMap<CaepEventType, Vec<Arc<dyn CaepEventHandler>>>>>;
99
100/// Configuration for Continuous Access Evaluation Protocol
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct CaepConfig {
103    /// URL for the event stream endpoint
104    pub event_stream_url: String,
105
106    /// How frequently to evaluate access decisions (default: 30 seconds)
107    pub evaluation_interval: Duration,
108
109    /// Whether to automatically revoke access on high-risk events
110    pub auto_revoke: bool,
111
112    /// Minimum risk score threshold for automatic revocation (0.0-1.0)
113    pub auto_revoke_threshold: f32,
114
115    /// Maximum number of concurrent event processors
116    pub max_concurrent_processors: usize,
117
118    /// Event retention period for audit trails
119    pub event_retention_period: Duration,
120
121    /// Cross-system event propagation endpoints
122    pub propagation_endpoints: Vec<String>,
123
124    /// Custom evaluation rules
125    pub evaluation_rules: Vec<CaepEvaluationRule>,
126
127    /// HMAC secret used to sign CAEP logout tokens (HS256).
128    /// **Must be set to a strong, randomly-generated secret in production.**
129    pub signing_secret: String,
130}
131
132impl Default for CaepConfig {
133    fn default() -> Self {
134        use ring::rand::{SecureRandom, SystemRandom};
135        // SAFETY: CSPRNG failure at initialization is terminal; the framework
136        // cannot operate without entropy.
137        let rng = SystemRandom::new();
138        let mut bytes = [0u8; 32];
139        rng.fill(&mut bytes)
140            .expect("AuthFramework fatal: system CSPRNG unavailable — the operating system cannot provide cryptographic randomness");
141        let signing_secret = bytes.iter().fold(String::with_capacity(64), |mut s, b| {
142            s.push_str(&format!("{b:02x}"));
143            s
144        });
145
146        Self {
147            event_stream_url: "wss://localhost:8080/caep/events".to_string(),
148            evaluation_interval: Duration::try_seconds(30).unwrap_or(Duration::zero()),
149            auto_revoke: true,
150            auto_revoke_threshold: 0.8,
151            max_concurrent_processors: 10,
152            event_retention_period: Duration::try_hours(24).unwrap_or(Duration::zero()),
153            propagation_endpoints: Vec::new(),
154            evaluation_rules: Vec::new(),
155            // Randomly generated at startup. Override with a persisted secret in production
156            // so that tokens remain verifiable across restarts.
157            signing_secret,
158        }
159    }
160}
161
162/// Types of CAEP events
163#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
164#[serde(rename_all = "snake_case")]
165pub enum CaepEventType {
166    /// User authentication events
167    UserLogin,
168    UserLogout,
169    UserProfileChange,
170    UserCredentialChange,
171
172    /// Session-related events
173    SessionCreated,
174    SessionModified,
175    SessionTimeout,
176    SessionSuspiciousActivity,
177
178    /// Risk assessment events
179    RiskScoreChange,
180    LocationChange,
181    DeviceChange,
182    BehavioralAnomaly,
183
184    /// Policy and compliance events
185    PolicyUpdate,
186    ComplianceViolation,
187    AccessPatternAnomaly,
188
189    /// System and security events
190    SystemOutage,
191    SecurityIncident,
192    DataBreach,
193
194    /// Custom events for extensibility
195    Custom(String),
196}
197
198/// Severity levels for CAEP events
199#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
200#[serde(rename_all = "snake_case")]
201pub enum CaepEventSeverity {
202    Low,
203    Medium,
204    High,
205    Critical,
206}
207
208/// Source of a CAEP event
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct CaepEventSource {
211    /// Identifier of the source system
212    pub system_id: String,
213
214    /// Type of source (e.g., "identity_provider", "risk_engine", "policy_engine")
215    pub source_type: String,
216
217    /// Version of the source system
218    pub version: Option<String>,
219
220    /// Additional source metadata
221    pub metadata: HashMap<String, serde_json::Value>,
222}
223
224/// A CAEP security event
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct CaepEvent {
227    /// Unique event identifier
228    pub id: Uuid,
229
230    /// Type of event
231    pub event_type: CaepEventType,
232
233    /// Subject (user) associated with the event
234    pub subject: String,
235
236    /// Severity level of the event
237    pub severity: CaepEventSeverity,
238
239    /// When the event occurred
240    pub timestamp: DateTime<Utc>,
241
242    /// Source of the event
243    pub source: CaepEventSource,
244
245    /// Current risk score (0.0-1.0)
246    pub risk_score: f32,
247
248    /// Session ID if applicable
249    pub session_id: Option<String>,
250
251    /// Geographic location information
252    pub location: Option<CaepLocationInfo>,
253
254    /// Device information
255    pub device_info: Option<CaepDeviceInfo>,
256
257    /// Event-specific data
258    pub event_data: serde_json::Value,
259
260    /// Correlation ID for related events
261    pub correlation_id: Option<Uuid>,
262}
263
264/// Geographic location information for CAEP events
265#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct CaepLocationInfo {
267    /// Country code (ISO 3166-1 alpha-2)
268    pub country: Option<String>,
269
270    /// Region or state
271    pub region: Option<String>,
272
273    /// City name
274    pub city: Option<String>,
275
276    /// Latitude coordinate
277    pub latitude: Option<f64>,
278
279    /// Longitude coordinate
280    pub longitude: Option<f64>,
281
282    /// IP address
283    pub ip_address: Option<String>,
284
285    /// Whether location is considered suspicious
286    pub is_suspicious: bool,
287}
288
289/// Device information for CAEP events
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct CaepDeviceInfo {
292    /// Device identifier
293    pub device_id: Option<String>,
294
295    /// Device type (mobile, desktop, tablet, etc.)
296    pub device_type: Option<String>,
297
298    /// Operating system
299    pub os: Option<String>,
300
301    /// Browser or client application
302    pub client: Option<String>,
303
304    /// Whether device is trusted
305    pub is_trusted: bool,
306
307    /// Whether device binding is required
308    pub requires_binding: bool,
309}
310
311/// Evaluation rule for continuous access decisions
312#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct CaepEvaluationRule {
314    /// Rule identifier
315    pub id: String,
316
317    /// Human-readable description
318    pub description: String,
319
320    /// Event types this rule applies to
321    pub applicable_events: Vec<CaepEventType>,
322
323    /// Conditions that must be met
324    pub conditions: Vec<CaepRuleCondition>,
325
326    /// Actions to take when rule is triggered
327    pub actions: Vec<CaepRuleAction>,
328
329    /// Priority of this rule (higher numbers = higher priority)
330    pub priority: i32,
331
332    /// Whether rule is currently enabled
333    pub enabled: bool,
334}
335
336/// Condition for a CAEP evaluation rule
337#[derive(Debug, Clone, Serialize, Deserialize)]
338#[serde(tag = "type", rename_all = "snake_case")]
339pub enum CaepRuleCondition {
340    /// Risk score threshold condition
341    RiskScoreAbove { threshold: f32 },
342
343    /// Event severity condition
344    SeverityAtLeast { severity: CaepEventSeverity },
345
346    /// Location-based condition
347    LocationChange { suspicious_only: bool },
348
349    /// Device-based condition
350    UnknownDevice { require_trusted: bool },
351
352    /// Time-based condition
353    OutsideBusinessHours { timezone: String },
354
355    /// Custom condition with expression
356    Custom { expression: String },
357}
358
359/// Action to take when a CAEP rule is triggered
360#[derive(Debug, Clone, Serialize, Deserialize)]
361#[serde(tag = "type", rename_all = "snake_case")]
362pub enum CaepRuleAction {
363    /// Revoke access tokens for the subject
364    RevokeAccess { immediate: bool },
365
366    /// Require step-up authentication
367    RequireStepUp { level: String },
368
369    /// Send notification
370    SendNotification { channels: Vec<String> },
371
372    /// Log security event
373    LogEvent { level: String },
374
375    /// Trigger external webhook
376    TriggerWebhook { url: String },
377
378    /// Quarantine session
379    QuarantineSession { duration_minutes: u32 },
380}
381
382/// Result of a continuous access evaluation
383#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct CaepEvaluationResult {
385    /// Subject being evaluated
386    pub subject: String,
387
388    /// Current access decision
389    pub access_decision: CaepAccessDecision,
390
391    /// Current risk score
392    pub risk_score: f32,
393
394    /// Triggered rules
395    pub triggered_rules: Vec<String>,
396
397    /// Required actions
398    pub required_actions: Vec<CaepRuleAction>,
399
400    /// Evaluation timestamp
401    pub evaluated_at: DateTime<Utc>,
402
403    /// Next evaluation time
404    pub next_evaluation: DateTime<Utc>,
405}
406
407/// Access decision from CAEP evaluation
408#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
409#[serde(rename_all = "snake_case")]
410pub enum CaepAccessDecision {
411    /// Access granted - continue as normal
412    Allow,
413
414    /// Access granted but requires monitoring
415    AllowWithMonitoring,
416
417    /// Access granted but requires step-up authentication
418    AllowWithStepUp,
419
420    /// Access temporarily denied - retry later
421    TemporaryDeny,
422
423    /// Access permanently denied - revoke tokens
424    Deny,
425}
426
427/// State of a CAEP session
428#[derive(Debug, Clone, Serialize, Deserialize)]
429pub struct CaepSessionState {
430    /// Session identifier
431    pub session_id: String,
432
433    /// Subject (user) of the session
434    pub subject: String,
435
436    /// Current risk score
437    pub risk_score: f32,
438
439    /// Last evaluation result
440    pub last_evaluation: Option<CaepEvaluationResult>,
441
442    /// Active events for this session
443    pub active_events: Vec<CaepEvent>,
444
445    /// Session creation time
446    pub created_at: DateTime<Utc>,
447
448    /// Last activity time
449    pub last_activity: DateTime<Utc>,
450
451    /// Whether session is quarantined
452    pub is_quarantined: bool,
453
454    /// Quarantine end time if applicable
455    pub quarantine_until: Option<DateTime<Utc>>,
456}
457
458/// Comprehensive session information combining OIDC and CAEP data
459#[derive(Debug, Clone, Serialize, Deserialize)]
460pub struct ComprehensiveSessionInfo {
461    /// OIDC session information
462    pub oidc_session: crate::server::oidc::oidc_session_management::OidcSession,
463
464    /// CAEP session information if available
465    pub caep_session: Option<CaepSessionState>,
466
467    /// Whether this session is being monitored by CAEP
468    pub is_monitored_by_caep: bool,
469}
470
471/// Event handler trait for CAEP events
472#[async_trait]
473pub trait CaepEventHandler: Send + Sync {
474    /// Handle a CAEP event
475    async fn handle_event(&self, event: &CaepEvent) -> Result<()>;
476
477    /// Get event types this handler can process
478    fn supported_event_types(&self) -> Vec<CaepEventType>;
479}
480
481/// Main CAEP manager for continuous access evaluation
482pub struct CaepManager {
483    /// Configuration
484    config: CaepConfig,
485
486    /// Session manager integration
487    session_manager: Arc<SessionManager>,
488
489    /// Logout manager for revocations
490    logout_manager: Arc<BackChannelLogoutManager>,
491
492    /// Step-up authentication manager
493    step_up_manager: Option<Arc<SteppedUpAuthManager>>,
494
495    /// Active sessions being monitored
496    sessions: Arc<RwLock<HashMap<String, CaepSessionState>>>,
497
498    /// Event handlers by type
499    event_handlers: EventHandlerMap,
500
501    /// Event stream broadcaster
502    event_broadcaster: broadcast::Sender<CaepEvent>,
503
504    /// Evaluation timer
505    evaluation_interval: Interval,
506
507    /// Event history for audit trails
508    event_history: Arc<RwLock<Vec<CaepEvent>>>,
509
510    /// Evaluation rules
511    rules: Arc<RwLock<Vec<CaepEvaluationRule>>>,
512}
513
514impl CaepManager {
515    /// Create a new CAEP manager
516    pub async fn new(
517        config: CaepConfig,
518        session_manager: Arc<SessionManager>,
519        logout_manager: Arc<BackChannelLogoutManager>,
520    ) -> Result<Self> {
521        let (event_broadcaster, _) = broadcast::channel(1000);
522        let evaluation_interval = interval(config.evaluation_interval.to_std().map_err(|e| {
523            AuthError::Configuration {
524                message: format!("Invalid evaluation interval: {}", e),
525                help: Some("Provide a valid duration for evaluation interval".to_string()),
526                docs_url: Some("https://docs.auth-framework.com/configuration".to_string()),
527                source: None,
528                suggested_fix: Some("Check your configuration and ensure the evaluation interval is properly formatted".to_string()),
529            }
530        })?);
531
532        Ok(Self {
533            config: config.clone(),
534            session_manager,
535            logout_manager,
536            step_up_manager: None,
537            sessions: Arc::new(RwLock::new(HashMap::new())),
538            event_handlers: Arc::new(RwLock::new(HashMap::new())),
539            event_broadcaster,
540            evaluation_interval,
541            event_history: Arc::new(RwLock::new(Vec::new())),
542            rules: Arc::new(RwLock::new(config.evaluation_rules)),
543        })
544    }
545
546    /// Set step-up authentication manager
547    pub fn with_step_up_manager(mut self, step_up_manager: Arc<SteppedUpAuthManager>) -> Self {
548        self.step_up_manager = Some(step_up_manager);
549        self
550    }
551
552    /// Register an event handler
553    pub async fn register_event_handler(
554        &self,
555        event_type: CaepEventType,
556        handler: Arc<dyn CaepEventHandler>,
557    ) -> Result<()> {
558        let mut handlers = self.event_handlers.write().await;
559        handlers.entry(event_type).or_default().push(handler);
560        Ok(())
561    }
562
563    /// Process a CAEP event
564    pub async fn process_event(&self, event: CaepEvent) -> Result<CaepEvaluationResult> {
565        // Add event to history
566        {
567            let mut history = self.event_history.write().await;
568            history.push(event.clone());
569
570            // Cleanup old events
571            let retention_cutoff = Utc::now() - self.config.event_retention_period;
572            history.retain(|e| e.timestamp >= retention_cutoff);
573        }
574
575        // Broadcast event
576        if let Err(e) = self.event_broadcaster.send(event.clone()) {
577            tracing::warn!("Failed to broadcast CAEP event: {}", e);
578        }
579
580        // Update session state
581        if let Some(session_id) = &event.session_id {
582            self.update_session_state(session_id, &event).await?;
583        }
584
585        // Evaluate access decision
586        let evaluation_result = self.evaluate_access(&event.subject, Some(&event)).await?;
587
588        // Execute required actions
589        self.execute_actions(&evaluation_result).await?;
590
591        // Notify registered handlers
592        self.notify_handlers(&event).await?;
593
594        Ok(evaluation_result)
595    }
596
597    /// Evaluate continuous access for a subject
598    pub async fn evaluate_access(
599        &self,
600        subject: &str,
601        triggering_event: Option<&CaepEvent>,
602    ) -> Result<CaepEvaluationResult> {
603        let rules = self.rules.read().await;
604        let mut triggered_rules = Vec::new();
605        let mut required_actions = Vec::new();
606        let risk_score = if let Some(event) = triggering_event {
607            event.risk_score
608        } else {
609            // Calculate risk from recent events
610            self.calculate_risk_score(subject).await?
611        };
612
613        // Apply evaluation rules
614        for rule in rules.iter() {
615            if !rule.enabled {
616                continue;
617            }
618
619            if let Some(event) = triggering_event
620                && !rule.applicable_events.contains(&event.event_type)
621            {
622                continue;
623            }
624
625            if self
626                .evaluate_rule_conditions(rule, subject, triggering_event, risk_score)
627                .await?
628            {
629                triggered_rules.push(rule.id.clone());
630                required_actions.extend(rule.actions.clone());
631            }
632        }
633
634        // Determine access decision
635        let access_decision = self.determine_access_decision(risk_score, &required_actions);
636
637        let now = Utc::now();
638        Ok(CaepEvaluationResult {
639            subject: subject.to_string(),
640            access_decision,
641            risk_score,
642            triggered_rules,
643            required_actions,
644            evaluated_at: now,
645            next_evaluation: now + self.config.evaluation_interval,
646        })
647    }
648
649    /// Start continuous evaluation loop
650    pub async fn start_continuous_evaluation(&mut self) -> Result<()> {
651        loop {
652            self.evaluation_interval.tick().await;
653
654            // First, synchronize with SessionManager to clean up stale sessions
655            self.synchronize_with_session_manager().await?;
656
657            // Evaluate all active sessions
658            let sessions = {
659                let sessions_guard = self.sessions.read().await;
660                sessions_guard.keys().cloned().collect::<Vec<_>>()
661            };
662
663            for session_id in sessions {
664                if let Some(session_state) = self.sessions.read().await.get(&session_id) {
665                    let evaluation = self.evaluate_access(&session_state.subject, None).await?;
666                    self.execute_actions(&evaluation).await?;
667                }
668            }
669        }
670    }
671
672    /// Synchronize CAEP sessions with SessionManager
673    async fn synchronize_with_session_manager(&self) -> Result<()> {
674        let mut sessions = self.sessions.write().await;
675        let mut sessions_to_remove = Vec::new();
676
677        for (session_id, caep_session) in sessions.iter() {
678            // Check if session still exists and is valid in SessionManager
679            if let Some(oidc_session) = self.session_manager.get_session(session_id) {
680                if !self.session_manager.is_session_valid(session_id) {
681                    tracing::info!("CAEP removing expired session: {}", session_id);
682                    sessions_to_remove.push(session_id.clone());
683                }
684                // Verify subject consistency
685                else if oidc_session.sub != caep_session.subject {
686                    tracing::warn!("CAEP session subject mismatch, removing: {}", session_id);
687                    sessions_to_remove.push(session_id.clone());
688                }
689            } else {
690                tracing::info!("CAEP removing orphaned session: {}", session_id);
691                sessions_to_remove.push(session_id.clone());
692            }
693        }
694
695        // Remove stale sessions
696        for session_id in sessions_to_remove {
697            sessions.remove(&session_id);
698        }
699
700        Ok(())
701    }
702
703    /// Revoke access for a subject
704    pub async fn revoke_subject_access(&self, subject: &str) -> Result<()> {
705        tracing::info!("CAEP revoking access for subject: {}", subject);
706
707        // Find all sessions for this subject and initiate back-channel logout
708        let sessions_to_logout = {
709            let sessions = self.sessions.read().await;
710            sessions
711                .iter()
712                .filter(|(_, session)| session.subject == subject)
713                .map(|(session_id, session)| (session_id.clone(), session.clone()))
714                .collect::<Vec<_>>()
715        };
716
717        // Process back-channel logout for each session
718        for (session_id, _) in &sessions_to_logout {
719            // Use the BackChannelLogoutManager to perform proper logout
720            let logout_request =
721                crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest {
722                    session_id: session_id.clone(),
723                    sub: subject.to_string(),
724                    sid: Some(session_id.clone()),
725                    iss: "caep-manager".to_string(), // In production, use actual issuer
726                    initiating_client_id: None,      // CAEP-initiated logout
727                    additional_events: Some({
728                        let mut events = HashMap::new();
729                        events.insert(
730                            "caep_reason".to_string(),
731                            serde_json::json!("automatic_revocation"),
732                        );
733                        events.insert(
734                            "timestamp".to_string(),
735                            serde_json::json!(Utc::now().timestamp()),
736                        );
737                        events
738                    }),
739                };
740
741            // Process the logout through the BackChannelLogoutManager
742            // Use async approach to handle logout manager integration
743            match self.process_backchannel_logout(&logout_request).await {
744                Ok(_) => {
745                    tracing::info!(
746                        "Successfully initiated back-channel logout for session {} (subject: {})",
747                        session_id,
748                        subject
749                    );
750                }
751                Err(e) => {
752                    tracing::error!(
753                        "Failed to initiate back-channel logout for session {} (subject: {}): {}",
754                        session_id,
755                        subject,
756                        e
757                    );
758                }
759            }
760        }
761
762        // Remove from CAEP active sessions after logout processing
763        let mut sessions = self.sessions.write().await;
764        sessions.retain(|_, session| session.subject != subject);
765
766        Ok(())
767    }
768
769    /// Process back-channel logout through the logout manager
770    async fn process_backchannel_logout(
771        &self,
772        logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
773    ) -> Result<()> {
774        // Use the logout manager to process the logout request
775        // This integrates CAEP with the existing logout infrastructure
776
777        // Use the logout manager to get metadata and validate capabilities
778        let logout_metadata = self.logout_manager.get_discovery_metadata();
779        tracing::info!("Logout manager capabilities: {:?}", logout_metadata);
780
781        // Create CAEP-specific logout token based on the result
782        let logout_token = self
783            .create_logout_token_for_caep_revocation(logout_request)
784            .await?;
785
786        // Handle CAEP-specific logout processing
787        self.handle_caep_logout(logout_request, &logout_token)
788            .await?;
789
790        tracing::info!("CAEP backchannel logout processed successfully");
791        Ok(())
792    }
793
794    /// Create logout token for CAEP-initiated revocation
795    async fn create_logout_token_for_caep_revocation(
796        &self,
797        logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
798    ) -> Result<String> {
799        use jsonwebtoken::{Algorithm, EncodingKey, Header, encode};
800        use serde_json::json;
801
802        // Create CAEP-specific logout token claims
803        let claims = json!({
804            "iss": logout_request.iss,
805            "sub": logout_request.sub,
806            "aud": ["caep-manager"],
807            "exp": (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp(),
808            "iat": chrono::Utc::now().timestamp(),
809            "jti": uuid::Uuid::new_v4().to_string(),
810            "sid": logout_request.sid,
811            "events": {
812                "http://schemas.openid.net/secevent/caep/event-type/session-revoked": {}
813            },
814            "caep_reason": logout_request.additional_events
815                .as_ref()
816                .and_then(|events| events.get("caep_reason"))
817                .unwrap_or(&serde_json::json!("automatic_revocation"))
818        });
819
820        // Use the configured signing secret; override CaepConfig::signing_secret in production.
821        let key = EncodingKey::from_secret(self.config.signing_secret.as_bytes());
822        let header = Header::new(Algorithm::HS256);
823
824        let token = encode(&header, &claims, &key).map_err(|e| {
825            AuthError::auth_method("caep", format!("Failed to create logout token: {}", e))
826        })?;
827
828        Ok(token)
829    }
830
831    /// Handle CAEP logout processing
832    async fn handle_caep_logout(
833        &self,
834        logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
835        logout_token: &str,
836    ) -> Result<()> {
837        // This method integrates with the logout manager's functionality
838        // In a production system, this would:
839        // 1. Validate the logout token
840        // 2. Notify all relevant clients about the session termination
841        // 3. Update session state in persistent storage
842        // 4. Trigger any cleanup procedures
843
844        tracing::info!(
845            "Processing CAEP logout for session: {}",
846            logout_request.session_id
847        );
848
849        // Update CAEP session state to reflect logout
850        {
851            let mut sessions = self.sessions.write().await;
852            if let Some(_session) = sessions.get(&logout_request.session_id) {
853                // Remove the session entirely as it's being terminated
854                sessions.remove(&logout_request.session_id);
855            }
856        }
857
858        // Emit CAEP event for the logout
859        let caep_event = CaepEvent {
860            id: uuid::Uuid::new_v4(),
861            event_type: CaepEventType::UserLogout, // Use existing event type
862            subject: logout_request.sub.clone(),
863            session_id: Some(logout_request.session_id.clone()),
864            timestamp: chrono::Utc::now(),
865            severity: CaepEventSeverity::High,
866            source: CaepEventSource {
867                system_id: "caep-manager".to_string(),
868                source_type: "caep_automatic_revocation".to_string(),
869                version: Some("1.0".to_string()),
870                metadata: std::collections::HashMap::new(),
871            },
872            risk_score: 1.0, // High risk score for revoked sessions
873            location: None,
874            device_info: None,
875            event_data: serde_json::json!({
876                "logout_token": logout_token,
877                "initiator": "caep_automatic_revocation",
878                "reason": logout_request.additional_events
879                    .as_ref()
880                    .and_then(|events| events.get("caep_reason"))
881                    .cloned()
882                    .unwrap_or_else(|| serde_json::json!("automatic_revocation"))
883            }),
884            correlation_id: Some(uuid::Uuid::new_v4()),
885        };
886
887        // Broadcast the event
888        if let Err(e) = self.event_broadcaster.send(caep_event) {
889            tracing::warn!("Failed to broadcast CAEP logout event: {}", e);
890        }
891
892        tracing::info!(
893            "CAEP logout completed for session: {}",
894            logout_request.session_id
895        );
896        Ok(())
897    }
898
899    /// Calculate risk score for a subject based on recent events
900    async fn calculate_risk_score(&self, subject: &str) -> Result<f32> {
901        let history = self.event_history.read().await;
902        let recent_cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
903
904        let recent_events: Vec<_> = history
905            .iter()
906            .filter(|e| e.subject == subject && e.timestamp >= recent_cutoff)
907            .collect();
908
909        if recent_events.is_empty() {
910            return Ok(0.0);
911        }
912
913        // Calculate weighted risk score
914        let mut total_risk = 0.0;
915        let mut total_weight = 0.0;
916
917        for event in recent_events {
918            let weight = match event.severity {
919                CaepEventSeverity::Low => 1.0,
920                CaepEventSeverity::Medium => 2.0,
921                CaepEventSeverity::High => 4.0,
922                CaepEventSeverity::Critical => 8.0,
923            };
924
925            total_risk += event.risk_score * weight;
926            total_weight += weight;
927        }
928
929        Ok(if total_weight > 0.0 {
930            (total_risk / total_weight).min(1.0)
931        } else {
932            0.0
933        })
934    }
935
936    /// Update session state based on an event
937    async fn update_session_state(&self, session_id: &str, event: &CaepEvent) -> Result<()> {
938        // First, validate the session exists in the SessionManager
939        if let Some(oidc_session) = self.session_manager.get_session(session_id) {
940            // Verify the session is still valid
941            if !self.session_manager.is_session_valid(session_id) {
942                tracing::warn!(
943                    "CAEP received event for expired OIDC session: {}",
944                    session_id
945                );
946                // Remove from CAEP sessions as well
947                let mut sessions = self.sessions.write().await;
948                sessions.remove(session_id);
949                return Ok(());
950            }
951
952            // Ensure subjects match
953            if oidc_session.sub != event.subject {
954                return Err(AuthError::validation(
955                    "Subject mismatch between CAEP event and OIDC session",
956                ));
957            }
958        } else {
959            tracing::warn!(
960                "CAEP received event for unknown OIDC session: {}",
961                session_id
962            );
963            return Err(AuthError::validation("Session not found in SessionManager"));
964        }
965
966        // Update CAEP-specific session state
967        let mut sessions = self.sessions.write().await;
968
969        let session_state =
970            sessions
971                .entry(session_id.to_string())
972                .or_insert_with(|| CaepSessionState {
973                    session_id: session_id.to_string(),
974                    subject: event.subject.clone(),
975                    risk_score: event.risk_score,
976                    last_evaluation: None,
977                    active_events: Vec::new(),
978                    created_at: Utc::now(),
979                    last_activity: Utc::now(),
980                    is_quarantined: false,
981                    quarantine_until: None,
982                });
983
984        session_state.risk_score = event.risk_score;
985        session_state.last_activity = Utc::now();
986        session_state.active_events.push(event.clone());
987
988        // Remove old events
989        let cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
990        session_state
991            .active_events
992            .retain(|e| e.timestamp >= cutoff);
993
994        Ok(())
995    }
996
997    /// Evaluate rule conditions
998    async fn evaluate_rule_conditions(
999        &self,
1000        rule: &CaepEvaluationRule,
1001        _subject: &str,
1002        event: Option<&CaepEvent>,
1003        risk_score: f32,
1004    ) -> Result<bool> {
1005        for condition in &rule.conditions {
1006            match condition {
1007                CaepRuleCondition::RiskScoreAbove { threshold } => {
1008                    if risk_score <= *threshold {
1009                        return Ok(false);
1010                    }
1011                }
1012                CaepRuleCondition::SeverityAtLeast { severity } => {
1013                    if let Some(event) = event {
1014                        let event_severity_level = match event.severity {
1015                            CaepEventSeverity::Critical => 4,
1016                            CaepEventSeverity::High => 3,
1017                            CaepEventSeverity::Medium => 2,
1018                            CaepEventSeverity::Low => 1,
1019                        };
1020
1021                        let required_severity_level = match severity {
1022                            CaepEventSeverity::Critical => 4,
1023                            CaepEventSeverity::High => 3,
1024                            CaepEventSeverity::Medium => 2,
1025                            CaepEventSeverity::Low => 1,
1026                        };
1027
1028                        if event_severity_level < required_severity_level {
1029                            return Ok(false);
1030                        }
1031                    } else {
1032                        return Ok(false);
1033                    }
1034                }
1035                CaepRuleCondition::LocationChange { suspicious_only } => {
1036                    if let Some(event) = event {
1037                        if let Some(location) = &event.location {
1038                            if *suspicious_only && !location.is_suspicious {
1039                                return Ok(false);
1040                            }
1041                        } else {
1042                            return Ok(false);
1043                        }
1044                    } else {
1045                        return Ok(false);
1046                    }
1047                }
1048                CaepRuleCondition::UnknownDevice { require_trusted } => {
1049                    if let Some(event) = event
1050                        && let Some(device) = &event.device_info
1051                        && *require_trusted
1052                        && device.is_trusted
1053                    {
1054                        return Ok(false);
1055                    }
1056                }
1057                CaepRuleCondition::OutsideBusinessHours { timezone: _ } => {
1058                    // Simplified: assume business hours are 9 AM - 5 PM UTC
1059                    let hour = Utc::now().hour();
1060                    if (9..17).contains(&hour) {
1061                        return Ok(false);
1062                    }
1063                }
1064                CaepRuleCondition::Custom { expression } => {
1065                    // Basic expression evaluation: support "field op value" comparisons
1066                    if !Self::evaluate_simple_expression(expression) {
1067                        return Ok(false);
1068                    }
1069                }
1070            }
1071        }
1072
1073        Ok(true)
1074    }
1075
1076    /// Evaluate a simple comparison expression like "risk_score > 0.8"
1077    /// Returns false (safe default) for unrecognized or unparseable expressions.
1078    fn evaluate_simple_expression(expression: &str) -> bool {
1079        let parts: Vec<&str> = expression.split_whitespace().collect();
1080        if parts.len() != 3 {
1081            tracing::warn!(
1082                expression = expression,
1083                "Unrecognized custom CAEP expression format, defaulting to false"
1084            );
1085            return false;
1086        }
1087
1088        let (_field, op, value_str) = (parts[0], parts[1], parts[2]);
1089        let Ok(threshold) = value_str.parse::<f64>() else {
1090            tracing::warn!(
1091                expression = expression,
1092                "Cannot parse threshold value in custom CAEP expression, defaulting to false"
1093            );
1094            return false;
1095        };
1096
1097        // Without runtime context binding, we can only validate the expression
1098        // is syntactically correct. Log and return false (safe default).
1099        tracing::debug!(
1100            expression = expression,
1101            threshold = threshold,
1102            op = op,
1103            "Custom CAEP expression parsed but no runtime context available, defaulting to false"
1104        );
1105        false
1106    }
1107
1108    /// Determine access decision based on risk and actions
1109    fn determine_access_decision(
1110        &self,
1111        risk_score: f32,
1112        actions: &[CaepRuleAction],
1113    ) -> CaepAccessDecision {
1114        for action in actions {
1115            match action {
1116                CaepRuleAction::RevokeAccess { immediate: true } => {
1117                    return CaepAccessDecision::Deny;
1118                }
1119                CaepRuleAction::RevokeAccess { immediate: false } => {
1120                    return CaepAccessDecision::TemporaryDeny;
1121                }
1122                CaepRuleAction::RequireStepUp { .. } => {
1123                    return CaepAccessDecision::AllowWithStepUp;
1124                }
1125                CaepRuleAction::QuarantineSession { .. } => {
1126                    return CaepAccessDecision::TemporaryDeny;
1127                }
1128                _ => {}
1129            }
1130        }
1131
1132        if risk_score >= self.config.auto_revoke_threshold {
1133            CaepAccessDecision::Deny
1134        } else if risk_score >= 0.6 {
1135            CaepAccessDecision::AllowWithMonitoring
1136        } else {
1137            CaepAccessDecision::Allow
1138        }
1139    }
1140
1141    /// Execute required actions from evaluation
1142    async fn execute_actions(&self, evaluation: &CaepEvaluationResult) -> Result<()> {
1143        for action in &evaluation.required_actions {
1144            match action {
1145                CaepRuleAction::RevokeAccess { .. } => {
1146                    self.revoke_subject_access(&evaluation.subject).await?;
1147                }
1148                CaepRuleAction::RequireStepUp { level } => {
1149                    if let Some(_step_up_manager) = &self.step_up_manager {
1150                        // Trigger step-up authentication
1151                        tracing::info!(
1152                            "CAEP requiring step-up to level {} for subject {}",
1153                            level,
1154                            evaluation.subject
1155                        );
1156                    }
1157                }
1158                CaepRuleAction::SendNotification { channels } => {
1159                    tracing::info!(
1160                        "CAEP sending notification via channels {:?} for subject {}",
1161                        channels,
1162                        evaluation.subject
1163                    );
1164                }
1165                CaepRuleAction::LogEvent { level } => {
1166                    tracing::info!(
1167                        "CAEP logging event at level {} for subject {}",
1168                        level,
1169                        evaluation.subject
1170                    );
1171                }
1172                CaepRuleAction::TriggerWebhook { url } => {
1173                    tracing::info!(
1174                        "CAEP triggering webhook {} for subject {}",
1175                        url,
1176                        evaluation.subject
1177                    );
1178                }
1179                CaepRuleAction::QuarantineSession { duration_minutes } => {
1180                    self.quarantine_session(&evaluation.subject, *duration_minutes)
1181                        .await?;
1182                }
1183            }
1184        }
1185
1186        Ok(())
1187    }
1188
1189    /// Quarantine a session
1190    async fn quarantine_session(&self, subject: &str, duration_minutes: u32) -> Result<()> {
1191        let mut sessions = self.sessions.write().await;
1192        let quarantine_until =
1193            Utc::now() + Duration::try_minutes(duration_minutes as i64).unwrap_or(Duration::zero());
1194
1195        // Update CAEP session state
1196        let mut quarantined_session_ids = Vec::new();
1197        for session in sessions.values_mut() {
1198            if session.subject == subject {
1199                session.is_quarantined = true;
1200                session.quarantine_until = Some(quarantine_until);
1201                quarantined_session_ids.push(session.session_id.clone());
1202            }
1203        }
1204
1205        tracing::info!(
1206            "CAEP quarantined {} sessions for subject {} until {}. Session IDs: {:?}",
1207            quarantined_session_ids.len(),
1208            subject,
1209            quarantine_until,
1210            quarantined_session_ids
1211        );
1212
1213        // In a production implementation, you might want to notify the SessionManager
1214        // about the quarantine status through a separate event or notification system
1215
1216        Ok(())
1217    }
1218
1219    /// Notify registered event handlers
1220    async fn notify_handlers(&self, event: &CaepEvent) -> Result<()> {
1221        let handlers = self.event_handlers.read().await;
1222
1223        if let Some(event_handlers) = handlers.get(&event.event_type) {
1224            for handler in event_handlers {
1225                if let Err(e) = handler.handle_event(event).await {
1226                    tracing::error!("CAEP event handler failed: {}", e);
1227                }
1228            }
1229        }
1230
1231        Ok(())
1232    }
1233
1234    /// Get current session state
1235    pub async fn get_session_state(&self, session_id: &str) -> Result<Option<CaepSessionState>> {
1236        // First validate with SessionManager
1237        if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1238            if !self.session_manager.is_session_valid(session_id) {
1239                // Session is expired in SessionManager, remove from CAEP as well
1240                let mut sessions = self.sessions.write().await;
1241                sessions.remove(session_id);
1242                return Ok(None);
1243            }
1244
1245            // Return CAEP session state if it exists and subjects match
1246            let sessions = self.sessions.read().await;
1247            if let Some(caep_session) = sessions.get(session_id) {
1248                if caep_session.subject == oidc_session.sub {
1249                    return Ok(Some(caep_session.clone()));
1250                } else {
1251                    tracing::warn!(
1252                        "Subject mismatch between CAEP and OIDC sessions for {}",
1253                        session_id
1254                    );
1255                    return Ok(None);
1256                }
1257            }
1258        }
1259
1260        // No valid OIDC session found
1261        Ok(None)
1262    }
1263
1264    /// Get event history for a subject
1265    pub async fn get_event_history(
1266        &self,
1267        subject: &str,
1268        limit: Option<usize>,
1269    ) -> Result<Vec<CaepEvent>> {
1270        let history = self.event_history.read().await;
1271        let mut events: Vec<_> = history
1272            .iter()
1273            .filter(|e| e.subject == subject)
1274            .cloned()
1275            .collect();
1276
1277        events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1278
1279        if let Some(limit) = limit {
1280            events.truncate(limit);
1281        }
1282
1283        Ok(events)
1284    }
1285
1286    /// Add or update an evaluation rule
1287    pub async fn add_evaluation_rule(&self, rule: CaepEvaluationRule) -> Result<()> {
1288        let mut rules = self.rules.write().await;
1289
1290        // Remove existing rule with same ID
1291        rules.retain(|r| r.id != rule.id);
1292
1293        // Insert new rule and sort by priority
1294        rules.push(rule);
1295        rules.sort_by(|a, b| b.priority.cmp(&a.priority));
1296
1297        Ok(())
1298    }
1299
1300    /// Remove an evaluation rule
1301    pub async fn remove_evaluation_rule(&self, rule_id: &str) -> Result<bool> {
1302        let mut rules = self.rules.write().await;
1303        let original_len = rules.len();
1304        rules.retain(|r| r.id != rule_id);
1305        Ok(rules.len() < original_len)
1306    }
1307
1308    /// Get comprehensive session information combining OIDC and CAEP data
1309    pub async fn get_comprehensive_session_info(
1310        &self,
1311        session_id: &str,
1312    ) -> Result<Option<ComprehensiveSessionInfo>> {
1313        // Get OIDC session information
1314        if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1315            if !self.session_manager.is_session_valid(session_id) {
1316                return Ok(None);
1317            }
1318
1319            // Get CAEP session information if available
1320            let caep_session = {
1321                let sessions = self.sessions.read().await;
1322                sessions.get(session_id).cloned()
1323            };
1324
1325            let comprehensive_info = ComprehensiveSessionInfo {
1326                oidc_session: oidc_session.clone(),
1327                is_monitored_by_caep: caep_session.is_some(),
1328                caep_session,
1329            };
1330
1331            Ok(Some(comprehensive_info))
1332        } else {
1333            Ok(None)
1334        }
1335    }
1336
1337    /// Get all sessions for a subject with comprehensive information
1338    pub async fn get_subject_sessions(
1339        &self,
1340        subject: &str,
1341    ) -> Result<Vec<ComprehensiveSessionInfo>> {
1342        let oidc_sessions = self.session_manager.get_sessions_for_subject(subject);
1343        let mut comprehensive_sessions = Vec::new();
1344
1345        for oidc_session in oidc_sessions {
1346            if self
1347                .session_manager
1348                .is_session_valid(&oidc_session.session_id)
1349            {
1350                let caep_session = {
1351                    let sessions = self.sessions.read().await;
1352                    sessions.get(&oidc_session.session_id).cloned()
1353                };
1354
1355                comprehensive_sessions.push(ComprehensiveSessionInfo {
1356                    oidc_session: oidc_session.clone(),
1357                    is_monitored_by_caep: caep_session.is_some(),
1358                    caep_session,
1359                });
1360            }
1361        }
1362
1363        Ok(comprehensive_sessions)
1364    }
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369    use super::*;
1370    use tokio;
1371
1372    #[tokio::test]
1373    async fn test_caep_event_creation() {
1374        let event = CaepEvent {
1375            id: Uuid::new_v4(),
1376            event_type: CaepEventType::RiskScoreChange,
1377            subject: "user123".to_string(),
1378            severity: CaepEventSeverity::High,
1379            timestamp: Utc::now(),
1380            source: CaepEventSource {
1381                system_id: "risk_engine".to_string(),
1382                source_type: "ml_model".to_string(),
1383                version: Some("1.0.0".to_string()),
1384                metadata: HashMap::new(),
1385            },
1386            risk_score: 0.85,
1387            session_id: Some("session123".to_string()),
1388            location: None,
1389            device_info: None,
1390            event_data: serde_json::json!({
1391                "previous_score": 0.3,
1392                "new_score": 0.85,
1393                "trigger": "suspicious_login_pattern"
1394            }),
1395            correlation_id: None,
1396        };
1397
1398        assert_eq!(event.subject, "user123");
1399        assert_eq!(event.risk_score, 0.85);
1400        assert!(matches!(event.severity, CaepEventSeverity::High));
1401    }
1402
1403    #[tokio::test]
1404    async fn test_caep_config_creation() {
1405        let config = CaepConfig::default();
1406        assert!(!config.event_stream_url.is_empty());
1407        assert!(config.auto_revoke);
1408        assert_eq!(config.auto_revoke_threshold, 0.8);
1409    }
1410
1411    #[tokio::test]
1412    async fn test_severity_comparison() {
1413        // Test severity level comparison logic
1414        let high_level = match CaepEventSeverity::High {
1415            CaepEventSeverity::Critical => 4,
1416            CaepEventSeverity::High => 3,
1417            CaepEventSeverity::Medium => 2,
1418            CaepEventSeverity::Low => 1,
1419        };
1420
1421        let medium_level = match CaepEventSeverity::Medium {
1422            CaepEventSeverity::Critical => 4,
1423            CaepEventSeverity::High => 3,
1424            CaepEventSeverity::Medium => 2,
1425            CaepEventSeverity::Low => 1,
1426        };
1427
1428        assert!(high_level > medium_level);
1429    }
1430}