1use crate::errors::{AuthError, Result};
84use crate::server::core::stepped_up_auth::SteppedUpAuthManager;
85use crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutManager;
86use crate::server::oidc::oidc_session_management::SessionManager;
87
88use async_trait::async_trait;
89use chrono::{DateTime, Duration, Timelike, Utc};
90use serde::{Deserialize, Serialize};
91use std::collections::HashMap;
92use std::sync::Arc;
93use tokio::sync::{RwLock, broadcast};
94use tokio::time::{Interval, interval};
95use uuid::Uuid;
96
97type EventHandlerMap = Arc<RwLock<HashMap<CaepEventType, Vec<Arc<dyn CaepEventHandler>>>>>;
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct CaepConfig {
103 pub event_stream_url: String,
105
106 pub evaluation_interval: Duration,
108
109 pub auto_revoke: bool,
111
112 pub auto_revoke_threshold: f32,
114
115 pub max_concurrent_processors: usize,
117
118 pub event_retention_period: Duration,
120
121 pub propagation_endpoints: Vec<String>,
123
124 pub evaluation_rules: Vec<CaepEvaluationRule>,
126
127 pub signing_secret: String,
130}
131
132impl Default for CaepConfig {
133 fn default() -> Self {
134 use ring::rand::{SecureRandom, SystemRandom};
135 let rng = SystemRandom::new();
138 let mut bytes = [0u8; 32];
139 rng.fill(&mut bytes)
140 .expect("AuthFramework fatal: system CSPRNG unavailable — the operating system cannot provide cryptographic randomness");
141 let signing_secret = bytes.iter().fold(String::with_capacity(64), |mut s, b| {
142 s.push_str(&format!("{b:02x}"));
143 s
144 });
145
146 Self {
147 event_stream_url: "wss://localhost:8080/caep/events".to_string(),
148 evaluation_interval: Duration::try_seconds(30).unwrap_or(Duration::zero()),
149 auto_revoke: true,
150 auto_revoke_threshold: 0.8,
151 max_concurrent_processors: 10,
152 event_retention_period: Duration::try_hours(24).unwrap_or(Duration::zero()),
153 propagation_endpoints: Vec::new(),
154 evaluation_rules: Vec::new(),
155 signing_secret,
158 }
159 }
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
164#[serde(rename_all = "snake_case")]
165pub enum CaepEventType {
166 UserLogin,
168 UserLogout,
169 UserProfileChange,
170 UserCredentialChange,
171
172 SessionCreated,
174 SessionModified,
175 SessionTimeout,
176 SessionSuspiciousActivity,
177
178 RiskScoreChange,
180 LocationChange,
181 DeviceChange,
182 BehavioralAnomaly,
183
184 PolicyUpdate,
186 ComplianceViolation,
187 AccessPatternAnomaly,
188
189 SystemOutage,
191 SecurityIncident,
192 DataBreach,
193
194 Custom(String),
196}
197
198#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
200#[serde(rename_all = "snake_case")]
201pub enum CaepEventSeverity {
202 Low,
203 Medium,
204 High,
205 Critical,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct CaepEventSource {
211 pub system_id: String,
213
214 pub source_type: String,
216
217 pub version: Option<String>,
219
220 pub metadata: HashMap<String, serde_json::Value>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct CaepEvent {
227 pub id: Uuid,
229
230 pub event_type: CaepEventType,
232
233 pub subject: String,
235
236 pub severity: CaepEventSeverity,
238
239 pub timestamp: DateTime<Utc>,
241
242 pub source: CaepEventSource,
244
245 pub risk_score: f32,
247
248 pub session_id: Option<String>,
250
251 pub location: Option<CaepLocationInfo>,
253
254 pub device_info: Option<CaepDeviceInfo>,
256
257 pub event_data: serde_json::Value,
259
260 pub correlation_id: Option<Uuid>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
266pub struct CaepLocationInfo {
267 pub country: Option<String>,
269
270 pub region: Option<String>,
272
273 pub city: Option<String>,
275
276 pub latitude: Option<f64>,
278
279 pub longitude: Option<f64>,
281
282 pub ip_address: Option<String>,
284
285 pub is_suspicious: bool,
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct CaepDeviceInfo {
292 pub device_id: Option<String>,
294
295 pub device_type: Option<String>,
297
298 pub os: Option<String>,
300
301 pub client: Option<String>,
303
304 pub is_trusted: bool,
306
307 pub requires_binding: bool,
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
313pub struct CaepEvaluationRule {
314 pub id: String,
316
317 pub description: String,
319
320 pub applicable_events: Vec<CaepEventType>,
322
323 pub conditions: Vec<CaepRuleCondition>,
325
326 pub actions: Vec<CaepRuleAction>,
328
329 pub priority: i32,
331
332 pub enabled: bool,
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize)]
338#[serde(tag = "type", rename_all = "snake_case")]
339pub enum CaepRuleCondition {
340 RiskScoreAbove { threshold: f32 },
342
343 SeverityAtLeast { severity: CaepEventSeverity },
345
346 LocationChange { suspicious_only: bool },
348
349 UnknownDevice { require_trusted: bool },
351
352 OutsideBusinessHours { timezone: String },
354
355 Custom { expression: String },
357}
358
359#[derive(Debug, Clone, Serialize, Deserialize)]
361#[serde(tag = "type", rename_all = "snake_case")]
362pub enum CaepRuleAction {
363 RevokeAccess { immediate: bool },
365
366 RequireStepUp { level: String },
368
369 SendNotification { channels: Vec<String> },
371
372 LogEvent { level: String },
374
375 TriggerWebhook { url: String },
377
378 QuarantineSession { duration_minutes: u32 },
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
384pub struct CaepEvaluationResult {
385 pub subject: String,
387
388 pub access_decision: CaepAccessDecision,
390
391 pub risk_score: f32,
393
394 pub triggered_rules: Vec<String>,
396
397 pub required_actions: Vec<CaepRuleAction>,
399
400 pub evaluated_at: DateTime<Utc>,
402
403 pub next_evaluation: DateTime<Utc>,
405}
406
407#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
409#[serde(rename_all = "snake_case")]
410pub enum CaepAccessDecision {
411 Allow,
413
414 AllowWithMonitoring,
416
417 AllowWithStepUp,
419
420 TemporaryDeny,
422
423 Deny,
425}
426
427#[derive(Debug, Clone, Serialize, Deserialize)]
429pub struct CaepSessionState {
430 pub session_id: String,
432
433 pub subject: String,
435
436 pub risk_score: f32,
438
439 pub last_evaluation: Option<CaepEvaluationResult>,
441
442 pub active_events: Vec<CaepEvent>,
444
445 pub created_at: DateTime<Utc>,
447
448 pub last_activity: DateTime<Utc>,
450
451 pub is_quarantined: bool,
453
454 pub quarantine_until: Option<DateTime<Utc>>,
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize)]
460pub struct ComprehensiveSessionInfo {
461 pub oidc_session: crate::server::oidc::oidc_session_management::OidcSession,
463
464 pub caep_session: Option<CaepSessionState>,
466
467 pub is_monitored_by_caep: bool,
469}
470
471#[async_trait]
473pub trait CaepEventHandler: Send + Sync {
474 async fn handle_event(&self, event: &CaepEvent) -> Result<()>;
476
477 fn supported_event_types(&self) -> Vec<CaepEventType>;
479}
480
481pub struct CaepManager {
483 config: CaepConfig,
485
486 session_manager: Arc<SessionManager>,
488
489 logout_manager: Arc<BackChannelLogoutManager>,
491
492 step_up_manager: Option<Arc<SteppedUpAuthManager>>,
494
495 sessions: Arc<RwLock<HashMap<String, CaepSessionState>>>,
497
498 event_handlers: EventHandlerMap,
500
501 event_broadcaster: broadcast::Sender<CaepEvent>,
503
504 evaluation_interval: Interval,
506
507 event_history: Arc<RwLock<Vec<CaepEvent>>>,
509
510 rules: Arc<RwLock<Vec<CaepEvaluationRule>>>,
512}
513
514impl CaepManager {
515 pub async fn new(
517 config: CaepConfig,
518 session_manager: Arc<SessionManager>,
519 logout_manager: Arc<BackChannelLogoutManager>,
520 ) -> Result<Self> {
521 let (event_broadcaster, _) = broadcast::channel(1000);
522 let evaluation_interval = interval(config.evaluation_interval.to_std().map_err(|e| {
523 AuthError::Configuration {
524 message: format!("Invalid evaluation interval: {}", e),
525 help: Some("Provide a valid duration for evaluation interval".to_string()),
526 docs_url: Some("https://docs.auth-framework.com/configuration".to_string()),
527 source: None,
528 suggested_fix: Some("Check your configuration and ensure the evaluation interval is properly formatted".to_string()),
529 }
530 })?);
531
532 Ok(Self {
533 config: config.clone(),
534 session_manager,
535 logout_manager,
536 step_up_manager: None,
537 sessions: Arc::new(RwLock::new(HashMap::new())),
538 event_handlers: Arc::new(RwLock::new(HashMap::new())),
539 event_broadcaster,
540 evaluation_interval,
541 event_history: Arc::new(RwLock::new(Vec::new())),
542 rules: Arc::new(RwLock::new(config.evaluation_rules)),
543 })
544 }
545
546 pub fn with_step_up_manager(mut self, step_up_manager: Arc<SteppedUpAuthManager>) -> Self {
548 self.step_up_manager = Some(step_up_manager);
549 self
550 }
551
552 pub async fn register_event_handler(
554 &self,
555 event_type: CaepEventType,
556 handler: Arc<dyn CaepEventHandler>,
557 ) -> Result<()> {
558 let mut handlers = self.event_handlers.write().await;
559 handlers.entry(event_type).or_default().push(handler);
560 Ok(())
561 }
562
563 pub async fn process_event(&self, event: CaepEvent) -> Result<CaepEvaluationResult> {
565 {
567 let mut history = self.event_history.write().await;
568 history.push(event.clone());
569
570 let retention_cutoff = Utc::now() - self.config.event_retention_period;
572 history.retain(|e| e.timestamp >= retention_cutoff);
573 }
574
575 if let Err(e) = self.event_broadcaster.send(event.clone()) {
577 tracing::warn!("Failed to broadcast CAEP event: {}", e);
578 }
579
580 if let Some(session_id) = &event.session_id {
582 self.update_session_state(session_id, &event).await?;
583 }
584
585 let evaluation_result = self.evaluate_access(&event.subject, Some(&event)).await?;
587
588 self.execute_actions(&evaluation_result).await?;
590
591 self.notify_handlers(&event).await?;
593
594 Ok(evaluation_result)
595 }
596
597 pub async fn evaluate_access(
599 &self,
600 subject: &str,
601 triggering_event: Option<&CaepEvent>,
602 ) -> Result<CaepEvaluationResult> {
603 let rules = self.rules.read().await;
604 let mut triggered_rules = Vec::new();
605 let mut required_actions = Vec::new();
606 let risk_score = if let Some(event) = triggering_event {
607 event.risk_score
608 } else {
609 self.calculate_risk_score(subject).await?
611 };
612
613 for rule in rules.iter() {
615 if !rule.enabled {
616 continue;
617 }
618
619 if let Some(event) = triggering_event
620 && !rule.applicable_events.contains(&event.event_type)
621 {
622 continue;
623 }
624
625 if self
626 .evaluate_rule_conditions(rule, subject, triggering_event, risk_score)
627 .await?
628 {
629 triggered_rules.push(rule.id.clone());
630 required_actions.extend(rule.actions.clone());
631 }
632 }
633
634 let access_decision = self.determine_access_decision(risk_score, &required_actions);
636
637 let now = Utc::now();
638 Ok(CaepEvaluationResult {
639 subject: subject.to_string(),
640 access_decision,
641 risk_score,
642 triggered_rules,
643 required_actions,
644 evaluated_at: now,
645 next_evaluation: now + self.config.evaluation_interval,
646 })
647 }
648
649 pub async fn start_continuous_evaluation(&mut self) -> Result<()> {
651 loop {
652 self.evaluation_interval.tick().await;
653
654 self.synchronize_with_session_manager().await?;
656
657 let sessions = {
659 let sessions_guard = self.sessions.read().await;
660 sessions_guard.keys().cloned().collect::<Vec<_>>()
661 };
662
663 for session_id in sessions {
664 if let Some(session_state) = self.sessions.read().await.get(&session_id) {
665 let evaluation = self.evaluate_access(&session_state.subject, None).await?;
666 self.execute_actions(&evaluation).await?;
667 }
668 }
669 }
670 }
671
672 async fn synchronize_with_session_manager(&self) -> Result<()> {
674 let mut sessions = self.sessions.write().await;
675 let mut sessions_to_remove = Vec::new();
676
677 for (session_id, caep_session) in sessions.iter() {
678 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
680 if !self.session_manager.is_session_valid(session_id) {
681 tracing::info!("CAEP removing expired session: {}", session_id);
682 sessions_to_remove.push(session_id.clone());
683 }
684 else if oidc_session.sub != caep_session.subject {
686 tracing::warn!("CAEP session subject mismatch, removing: {}", session_id);
687 sessions_to_remove.push(session_id.clone());
688 }
689 } else {
690 tracing::info!("CAEP removing orphaned session: {}", session_id);
691 sessions_to_remove.push(session_id.clone());
692 }
693 }
694
695 for session_id in sessions_to_remove {
697 sessions.remove(&session_id);
698 }
699
700 Ok(())
701 }
702
703 pub async fn revoke_subject_access(&self, subject: &str) -> Result<()> {
705 tracing::info!("CAEP revoking access for subject: {}", subject);
706
707 let sessions_to_logout = {
709 let sessions = self.sessions.read().await;
710 sessions
711 .iter()
712 .filter(|(_, session)| session.subject == subject)
713 .map(|(session_id, session)| (session_id.clone(), session.clone()))
714 .collect::<Vec<_>>()
715 };
716
717 for (session_id, _) in &sessions_to_logout {
719 let logout_request =
721 crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest {
722 session_id: session_id.clone(),
723 sub: subject.to_string(),
724 sid: Some(session_id.clone()),
725 iss: "caep-manager".to_string(), initiating_client_id: None, additional_events: Some({
728 let mut events = HashMap::new();
729 events.insert(
730 "caep_reason".to_string(),
731 serde_json::json!("automatic_revocation"),
732 );
733 events.insert(
734 "timestamp".to_string(),
735 serde_json::json!(Utc::now().timestamp()),
736 );
737 events
738 }),
739 };
740
741 match self.process_backchannel_logout(&logout_request).await {
744 Ok(_) => {
745 tracing::info!(
746 "Successfully initiated back-channel logout for session {} (subject: {})",
747 session_id,
748 subject
749 );
750 }
751 Err(e) => {
752 tracing::error!(
753 "Failed to initiate back-channel logout for session {} (subject: {}): {}",
754 session_id,
755 subject,
756 e
757 );
758 }
759 }
760 }
761
762 let mut sessions = self.sessions.write().await;
764 sessions.retain(|_, session| session.subject != subject);
765
766 Ok(())
767 }
768
769 async fn process_backchannel_logout(
771 &self,
772 logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
773 ) -> Result<()> {
774 let logout_metadata = self.logout_manager.get_discovery_metadata();
779 tracing::info!("Logout manager capabilities: {:?}", logout_metadata);
780
781 let logout_token = self
783 .create_logout_token_for_caep_revocation(logout_request)
784 .await?;
785
786 self.handle_caep_logout(logout_request, &logout_token)
788 .await?;
789
790 tracing::info!("CAEP backchannel logout processed successfully");
791 Ok(())
792 }
793
794 async fn create_logout_token_for_caep_revocation(
796 &self,
797 logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
798 ) -> Result<String> {
799 use jsonwebtoken::{Algorithm, EncodingKey, Header, encode};
800 use serde_json::json;
801
802 let claims = json!({
804 "iss": logout_request.iss,
805 "sub": logout_request.sub,
806 "aud": ["caep-manager"],
807 "exp": (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp(),
808 "iat": chrono::Utc::now().timestamp(),
809 "jti": uuid::Uuid::new_v4().to_string(),
810 "sid": logout_request.sid,
811 "events": {
812 "http://schemas.openid.net/secevent/caep/event-type/session-revoked": {}
813 },
814 "caep_reason": logout_request.additional_events
815 .as_ref()
816 .and_then(|events| events.get("caep_reason"))
817 .unwrap_or(&serde_json::json!("automatic_revocation"))
818 });
819
820 let key = EncodingKey::from_secret(self.config.signing_secret.as_bytes());
822 let header = Header::new(Algorithm::HS256);
823
824 let token = encode(&header, &claims, &key).map_err(|e| {
825 AuthError::auth_method("caep", format!("Failed to create logout token: {}", e))
826 })?;
827
828 Ok(token)
829 }
830
831 async fn handle_caep_logout(
833 &self,
834 logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
835 logout_token: &str,
836 ) -> Result<()> {
837 tracing::info!(
845 "Processing CAEP logout for session: {}",
846 logout_request.session_id
847 );
848
849 {
851 let mut sessions = self.sessions.write().await;
852 if let Some(_session) = sessions.get(&logout_request.session_id) {
853 sessions.remove(&logout_request.session_id);
855 }
856 }
857
858 let caep_event = CaepEvent {
860 id: uuid::Uuid::new_v4(),
861 event_type: CaepEventType::UserLogout, subject: logout_request.sub.clone(),
863 session_id: Some(logout_request.session_id.clone()),
864 timestamp: chrono::Utc::now(),
865 severity: CaepEventSeverity::High,
866 source: CaepEventSource {
867 system_id: "caep-manager".to_string(),
868 source_type: "caep_automatic_revocation".to_string(),
869 version: Some("1.0".to_string()),
870 metadata: std::collections::HashMap::new(),
871 },
872 risk_score: 1.0, location: None,
874 device_info: None,
875 event_data: serde_json::json!({
876 "logout_token": logout_token,
877 "initiator": "caep_automatic_revocation",
878 "reason": logout_request.additional_events
879 .as_ref()
880 .and_then(|events| events.get("caep_reason"))
881 .cloned()
882 .unwrap_or_else(|| serde_json::json!("automatic_revocation"))
883 }),
884 correlation_id: Some(uuid::Uuid::new_v4()),
885 };
886
887 if let Err(e) = self.event_broadcaster.send(caep_event) {
889 tracing::warn!("Failed to broadcast CAEP logout event: {}", e);
890 }
891
892 tracing::info!(
893 "CAEP logout completed for session: {}",
894 logout_request.session_id
895 );
896 Ok(())
897 }
898
899 async fn calculate_risk_score(&self, subject: &str) -> Result<f32> {
901 let history = self.event_history.read().await;
902 let recent_cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
903
904 let recent_events: Vec<_> = history
905 .iter()
906 .filter(|e| e.subject == subject && e.timestamp >= recent_cutoff)
907 .collect();
908
909 if recent_events.is_empty() {
910 return Ok(0.0);
911 }
912
913 let mut total_risk = 0.0;
915 let mut total_weight = 0.0;
916
917 for event in recent_events {
918 let weight = match event.severity {
919 CaepEventSeverity::Low => 1.0,
920 CaepEventSeverity::Medium => 2.0,
921 CaepEventSeverity::High => 4.0,
922 CaepEventSeverity::Critical => 8.0,
923 };
924
925 total_risk += event.risk_score * weight;
926 total_weight += weight;
927 }
928
929 Ok(if total_weight > 0.0 {
930 (total_risk / total_weight).min(1.0)
931 } else {
932 0.0
933 })
934 }
935
936 async fn update_session_state(&self, session_id: &str, event: &CaepEvent) -> Result<()> {
938 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
940 if !self.session_manager.is_session_valid(session_id) {
942 tracing::warn!(
943 "CAEP received event for expired OIDC session: {}",
944 session_id
945 );
946 let mut sessions = self.sessions.write().await;
948 sessions.remove(session_id);
949 return Ok(());
950 }
951
952 if oidc_session.sub != event.subject {
954 return Err(AuthError::validation(
955 "Subject mismatch between CAEP event and OIDC session",
956 ));
957 }
958 } else {
959 tracing::warn!(
960 "CAEP received event for unknown OIDC session: {}",
961 session_id
962 );
963 return Err(AuthError::validation("Session not found in SessionManager"));
964 }
965
966 let mut sessions = self.sessions.write().await;
968
969 let session_state =
970 sessions
971 .entry(session_id.to_string())
972 .or_insert_with(|| CaepSessionState {
973 session_id: session_id.to_string(),
974 subject: event.subject.clone(),
975 risk_score: event.risk_score,
976 last_evaluation: None,
977 active_events: Vec::new(),
978 created_at: Utc::now(),
979 last_activity: Utc::now(),
980 is_quarantined: false,
981 quarantine_until: None,
982 });
983
984 session_state.risk_score = event.risk_score;
985 session_state.last_activity = Utc::now();
986 session_state.active_events.push(event.clone());
987
988 let cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
990 session_state
991 .active_events
992 .retain(|e| e.timestamp >= cutoff);
993
994 Ok(())
995 }
996
997 async fn evaluate_rule_conditions(
999 &self,
1000 rule: &CaepEvaluationRule,
1001 _subject: &str,
1002 event: Option<&CaepEvent>,
1003 risk_score: f32,
1004 ) -> Result<bool> {
1005 for condition in &rule.conditions {
1006 match condition {
1007 CaepRuleCondition::RiskScoreAbove { threshold } => {
1008 if risk_score <= *threshold {
1009 return Ok(false);
1010 }
1011 }
1012 CaepRuleCondition::SeverityAtLeast { severity } => {
1013 if let Some(event) = event {
1014 let event_severity_level = match event.severity {
1015 CaepEventSeverity::Critical => 4,
1016 CaepEventSeverity::High => 3,
1017 CaepEventSeverity::Medium => 2,
1018 CaepEventSeverity::Low => 1,
1019 };
1020
1021 let required_severity_level = match severity {
1022 CaepEventSeverity::Critical => 4,
1023 CaepEventSeverity::High => 3,
1024 CaepEventSeverity::Medium => 2,
1025 CaepEventSeverity::Low => 1,
1026 };
1027
1028 if event_severity_level < required_severity_level {
1029 return Ok(false);
1030 }
1031 } else {
1032 return Ok(false);
1033 }
1034 }
1035 CaepRuleCondition::LocationChange { suspicious_only } => {
1036 if let Some(event) = event {
1037 if let Some(location) = &event.location {
1038 if *suspicious_only && !location.is_suspicious {
1039 return Ok(false);
1040 }
1041 } else {
1042 return Ok(false);
1043 }
1044 } else {
1045 return Ok(false);
1046 }
1047 }
1048 CaepRuleCondition::UnknownDevice { require_trusted } => {
1049 if let Some(event) = event
1050 && let Some(device) = &event.device_info
1051 && *require_trusted
1052 && device.is_trusted
1053 {
1054 return Ok(false);
1055 }
1056 }
1057 CaepRuleCondition::OutsideBusinessHours { timezone: _ } => {
1058 let hour = Utc::now().hour();
1060 if (9..17).contains(&hour) {
1061 return Ok(false);
1062 }
1063 }
1064 CaepRuleCondition::Custom { expression } => {
1065 if !Self::evaluate_simple_expression(expression) {
1067 return Ok(false);
1068 }
1069 }
1070 }
1071 }
1072
1073 Ok(true)
1074 }
1075
1076 fn evaluate_simple_expression(expression: &str) -> bool {
1079 let parts: Vec<&str> = expression.split_whitespace().collect();
1080 if parts.len() != 3 {
1081 tracing::warn!(
1082 expression = expression,
1083 "Unrecognized custom CAEP expression format, defaulting to false"
1084 );
1085 return false;
1086 }
1087
1088 let (_field, op, value_str) = (parts[0], parts[1], parts[2]);
1089 let Ok(threshold) = value_str.parse::<f64>() else {
1090 tracing::warn!(
1091 expression = expression,
1092 "Cannot parse threshold value in custom CAEP expression, defaulting to false"
1093 );
1094 return false;
1095 };
1096
1097 tracing::debug!(
1100 expression = expression,
1101 threshold = threshold,
1102 op = op,
1103 "Custom CAEP expression parsed but no runtime context available, defaulting to false"
1104 );
1105 false
1106 }
1107
1108 fn determine_access_decision(
1110 &self,
1111 risk_score: f32,
1112 actions: &[CaepRuleAction],
1113 ) -> CaepAccessDecision {
1114 for action in actions {
1115 match action {
1116 CaepRuleAction::RevokeAccess { immediate: true } => {
1117 return CaepAccessDecision::Deny;
1118 }
1119 CaepRuleAction::RevokeAccess { immediate: false } => {
1120 return CaepAccessDecision::TemporaryDeny;
1121 }
1122 CaepRuleAction::RequireStepUp { .. } => {
1123 return CaepAccessDecision::AllowWithStepUp;
1124 }
1125 CaepRuleAction::QuarantineSession { .. } => {
1126 return CaepAccessDecision::TemporaryDeny;
1127 }
1128 _ => {}
1129 }
1130 }
1131
1132 if risk_score >= self.config.auto_revoke_threshold {
1133 CaepAccessDecision::Deny
1134 } else if risk_score >= 0.6 {
1135 CaepAccessDecision::AllowWithMonitoring
1136 } else {
1137 CaepAccessDecision::Allow
1138 }
1139 }
1140
1141 async fn execute_actions(&self, evaluation: &CaepEvaluationResult) -> Result<()> {
1143 for action in &evaluation.required_actions {
1144 match action {
1145 CaepRuleAction::RevokeAccess { .. } => {
1146 self.revoke_subject_access(&evaluation.subject).await?;
1147 }
1148 CaepRuleAction::RequireStepUp { level } => {
1149 if let Some(_step_up_manager) = &self.step_up_manager {
1150 tracing::info!(
1152 "CAEP requiring step-up to level {} for subject {}",
1153 level,
1154 evaluation.subject
1155 );
1156 }
1157 }
1158 CaepRuleAction::SendNotification { channels } => {
1159 tracing::info!(
1160 "CAEP sending notification via channels {:?} for subject {}",
1161 channels,
1162 evaluation.subject
1163 );
1164 }
1165 CaepRuleAction::LogEvent { level } => {
1166 tracing::info!(
1167 "CAEP logging event at level {} for subject {}",
1168 level,
1169 evaluation.subject
1170 );
1171 }
1172 CaepRuleAction::TriggerWebhook { url } => {
1173 tracing::info!(
1174 "CAEP triggering webhook {} for subject {}",
1175 url,
1176 evaluation.subject
1177 );
1178 }
1179 CaepRuleAction::QuarantineSession { duration_minutes } => {
1180 self.quarantine_session(&evaluation.subject, *duration_minutes)
1181 .await?;
1182 }
1183 }
1184 }
1185
1186 Ok(())
1187 }
1188
1189 async fn quarantine_session(&self, subject: &str, duration_minutes: u32) -> Result<()> {
1191 let mut sessions = self.sessions.write().await;
1192 let quarantine_until =
1193 Utc::now() + Duration::try_minutes(duration_minutes as i64).unwrap_or(Duration::zero());
1194
1195 let mut quarantined_session_ids = Vec::new();
1197 for session in sessions.values_mut() {
1198 if session.subject == subject {
1199 session.is_quarantined = true;
1200 session.quarantine_until = Some(quarantine_until);
1201 quarantined_session_ids.push(session.session_id.clone());
1202 }
1203 }
1204
1205 tracing::info!(
1206 "CAEP quarantined {} sessions for subject {} until {}. Session IDs: {:?}",
1207 quarantined_session_ids.len(),
1208 subject,
1209 quarantine_until,
1210 quarantined_session_ids
1211 );
1212
1213 Ok(())
1217 }
1218
1219 async fn notify_handlers(&self, event: &CaepEvent) -> Result<()> {
1221 let handlers = self.event_handlers.read().await;
1222
1223 if let Some(event_handlers) = handlers.get(&event.event_type) {
1224 for handler in event_handlers {
1225 if let Err(e) = handler.handle_event(event).await {
1226 tracing::error!("CAEP event handler failed: {}", e);
1227 }
1228 }
1229 }
1230
1231 Ok(())
1232 }
1233
1234 pub async fn get_session_state(&self, session_id: &str) -> Result<Option<CaepSessionState>> {
1236 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1238 if !self.session_manager.is_session_valid(session_id) {
1239 let mut sessions = self.sessions.write().await;
1241 sessions.remove(session_id);
1242 return Ok(None);
1243 }
1244
1245 let sessions = self.sessions.read().await;
1247 if let Some(caep_session) = sessions.get(session_id) {
1248 if caep_session.subject == oidc_session.sub {
1249 return Ok(Some(caep_session.clone()));
1250 } else {
1251 tracing::warn!(
1252 "Subject mismatch between CAEP and OIDC sessions for {}",
1253 session_id
1254 );
1255 return Ok(None);
1256 }
1257 }
1258 }
1259
1260 Ok(None)
1262 }
1263
1264 pub async fn get_event_history(
1266 &self,
1267 subject: &str,
1268 limit: Option<usize>,
1269 ) -> Result<Vec<CaepEvent>> {
1270 let history = self.event_history.read().await;
1271 let mut events: Vec<_> = history
1272 .iter()
1273 .filter(|e| e.subject == subject)
1274 .cloned()
1275 .collect();
1276
1277 events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1278
1279 if let Some(limit) = limit {
1280 events.truncate(limit);
1281 }
1282
1283 Ok(events)
1284 }
1285
1286 pub async fn add_evaluation_rule(&self, rule: CaepEvaluationRule) -> Result<()> {
1288 let mut rules = self.rules.write().await;
1289
1290 rules.retain(|r| r.id != rule.id);
1292
1293 rules.push(rule);
1295 rules.sort_by(|a, b| b.priority.cmp(&a.priority));
1296
1297 Ok(())
1298 }
1299
1300 pub async fn remove_evaluation_rule(&self, rule_id: &str) -> Result<bool> {
1302 let mut rules = self.rules.write().await;
1303 let original_len = rules.len();
1304 rules.retain(|r| r.id != rule_id);
1305 Ok(rules.len() < original_len)
1306 }
1307
1308 pub async fn get_comprehensive_session_info(
1310 &self,
1311 session_id: &str,
1312 ) -> Result<Option<ComprehensiveSessionInfo>> {
1313 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1315 if !self.session_manager.is_session_valid(session_id) {
1316 return Ok(None);
1317 }
1318
1319 let caep_session = {
1321 let sessions = self.sessions.read().await;
1322 sessions.get(session_id).cloned()
1323 };
1324
1325 let comprehensive_info = ComprehensiveSessionInfo {
1326 oidc_session: oidc_session.clone(),
1327 is_monitored_by_caep: caep_session.is_some(),
1328 caep_session,
1329 };
1330
1331 Ok(Some(comprehensive_info))
1332 } else {
1333 Ok(None)
1334 }
1335 }
1336
1337 pub async fn get_subject_sessions(
1339 &self,
1340 subject: &str,
1341 ) -> Result<Vec<ComprehensiveSessionInfo>> {
1342 let oidc_sessions = self.session_manager.get_sessions_for_subject(subject);
1343 let mut comprehensive_sessions = Vec::new();
1344
1345 for oidc_session in oidc_sessions {
1346 if self
1347 .session_manager
1348 .is_session_valid(&oidc_session.session_id)
1349 {
1350 let caep_session = {
1351 let sessions = self.sessions.read().await;
1352 sessions.get(&oidc_session.session_id).cloned()
1353 };
1354
1355 comprehensive_sessions.push(ComprehensiveSessionInfo {
1356 oidc_session: oidc_session.clone(),
1357 is_monitored_by_caep: caep_session.is_some(),
1358 caep_session,
1359 });
1360 }
1361 }
1362
1363 Ok(comprehensive_sessions)
1364 }
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369 use super::*;
1370 use tokio;
1371
1372 #[tokio::test]
1373 async fn test_caep_event_creation() {
1374 let event = CaepEvent {
1375 id: Uuid::new_v4(),
1376 event_type: CaepEventType::RiskScoreChange,
1377 subject: "user123".to_string(),
1378 severity: CaepEventSeverity::High,
1379 timestamp: Utc::now(),
1380 source: CaepEventSource {
1381 system_id: "risk_engine".to_string(),
1382 source_type: "ml_model".to_string(),
1383 version: Some("1.0.0".to_string()),
1384 metadata: HashMap::new(),
1385 },
1386 risk_score: 0.85,
1387 session_id: Some("session123".to_string()),
1388 location: None,
1389 device_info: None,
1390 event_data: serde_json::json!({
1391 "previous_score": 0.3,
1392 "new_score": 0.85,
1393 "trigger": "suspicious_login_pattern"
1394 }),
1395 correlation_id: None,
1396 };
1397
1398 assert_eq!(event.subject, "user123");
1399 assert_eq!(event.risk_score, 0.85);
1400 assert!(matches!(event.severity, CaepEventSeverity::High));
1401 }
1402
1403 #[tokio::test]
1404 async fn test_caep_config_creation() {
1405 let config = CaepConfig::default();
1406 assert!(!config.event_stream_url.is_empty());
1407 assert!(config.auto_revoke);
1408 assert_eq!(config.auto_revoke_threshold, 0.8);
1409 }
1410
1411 #[tokio::test]
1412 async fn test_severity_comparison() {
1413 let high_level = match CaepEventSeverity::High {
1415 CaepEventSeverity::Critical => 4,
1416 CaepEventSeverity::High => 3,
1417 CaepEventSeverity::Medium => 2,
1418 CaepEventSeverity::Low => 1,
1419 };
1420
1421 let medium_level = match CaepEventSeverity::Medium {
1422 CaepEventSeverity::Critical => 4,
1423 CaepEventSeverity::High => 3,
1424 CaepEventSeverity::Medium => 2,
1425 CaepEventSeverity::Low => 1,
1426 };
1427
1428 assert!(high_level > medium_level);
1429 }
1430}