1use crate::errors::{AuthError, Result};
84use crate::server::core::stepped_up_auth::SteppedUpAuthManager;
85use crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutManager;
86use crate::server::oidc::oidc_session_management::SessionManager;
87
88use async_trait::async_trait;
89use chrono::{DateTime, Duration, Timelike, Utc};
90use serde::{Deserialize, Serialize};
91use std::collections::HashMap;
92use std::sync::Arc;
93use tokio::sync::{RwLock, broadcast};
94use tokio::time::{Interval, interval};
95use uuid::Uuid;
96
97type EventHandlerMap = Arc<RwLock<HashMap<CaepEventType, Vec<Arc<dyn CaepEventHandler>>>>>;
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct CaepConfig {
103 pub event_stream_url: String,
105
106 pub evaluation_interval: Duration,
108
109 pub auto_revoke: bool,
111
112 pub auto_revoke_threshold: f32,
114
115 pub max_concurrent_processors: usize,
117
118 pub event_retention_period: Duration,
120
121 pub propagation_endpoints: Vec<String>,
123
124 pub evaluation_rules: Vec<CaepEvaluationRule>,
126}
127
128impl Default for CaepConfig {
129 fn default() -> Self {
130 Self {
131 event_stream_url: "wss://localhost:8080/caep/events".to_string(),
132 evaluation_interval: Duration::try_seconds(30).unwrap_or(Duration::zero()),
133 auto_revoke: true,
134 auto_revoke_threshold: 0.8,
135 max_concurrent_processors: 10,
136 event_retention_period: Duration::try_hours(24).unwrap_or(Duration::zero()),
137 propagation_endpoints: Vec::new(),
138 evaluation_rules: Vec::new(),
139 }
140 }
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
145#[serde(rename_all = "snake_case")]
146pub enum CaepEventType {
147 UserLogin,
149 UserLogout,
150 UserProfileChange,
151 UserCredentialChange,
152
153 SessionCreated,
155 SessionModified,
156 SessionTimeout,
157 SessionSuspiciousActivity,
158
159 RiskScoreChange,
161 LocationChange,
162 DeviceChange,
163 BehavioralAnomaly,
164
165 PolicyUpdate,
167 ComplianceViolation,
168 AccessPatternAnomaly,
169
170 SystemOutage,
172 SecurityIncident,
173 DataBreach,
174
175 Custom(String),
177}
178
179#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
181#[serde(rename_all = "snake_case")]
182pub enum CaepEventSeverity {
183 Low,
184 Medium,
185 High,
186 Critical,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct CaepEventSource {
192 pub system_id: String,
194
195 pub source_type: String,
197
198 pub version: Option<String>,
200
201 pub metadata: HashMap<String, serde_json::Value>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct CaepEvent {
208 pub id: Uuid,
210
211 pub event_type: CaepEventType,
213
214 pub subject: String,
216
217 pub severity: CaepEventSeverity,
219
220 pub timestamp: DateTime<Utc>,
222
223 pub source: CaepEventSource,
225
226 pub risk_score: f32,
228
229 pub session_id: Option<String>,
231
232 pub location: Option<CaepLocationInfo>,
234
235 pub device_info: Option<CaepDeviceInfo>,
237
238 pub event_data: serde_json::Value,
240
241 pub correlation_id: Option<Uuid>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct CaepLocationInfo {
248 pub country: Option<String>,
250
251 pub region: Option<String>,
253
254 pub city: Option<String>,
256
257 pub latitude: Option<f64>,
259
260 pub longitude: Option<f64>,
262
263 pub ip_address: Option<String>,
265
266 pub is_suspicious: bool,
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct CaepDeviceInfo {
273 pub device_id: Option<String>,
275
276 pub device_type: Option<String>,
278
279 pub os: Option<String>,
281
282 pub client: Option<String>,
284
285 pub is_trusted: bool,
287
288 pub requires_binding: bool,
290}
291
292#[derive(Debug, Clone, Serialize, Deserialize)]
294pub struct CaepEvaluationRule {
295 pub id: String,
297
298 pub description: String,
300
301 pub applicable_events: Vec<CaepEventType>,
303
304 pub conditions: Vec<CaepRuleCondition>,
306
307 pub actions: Vec<CaepRuleAction>,
309
310 pub priority: i32,
312
313 pub enabled: bool,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319#[serde(tag = "type", rename_all = "snake_case")]
320pub enum CaepRuleCondition {
321 RiskScoreAbove { threshold: f32 },
323
324 SeverityAtLeast { severity: CaepEventSeverity },
326
327 LocationChange { suspicious_only: bool },
329
330 UnknownDevice { require_trusted: bool },
332
333 OutsideBusinessHours { timezone: String },
335
336 Custom { expression: String },
338}
339
340#[derive(Debug, Clone, Serialize, Deserialize)]
342#[serde(tag = "type", rename_all = "snake_case")]
343pub enum CaepRuleAction {
344 RevokeAccess { immediate: bool },
346
347 RequireStepUp { level: String },
349
350 SendNotification { channels: Vec<String> },
352
353 LogEvent { level: String },
355
356 TriggerWebhook { url: String },
358
359 QuarantineSession { duration_minutes: u32 },
361}
362
363#[derive(Debug, Clone, Serialize, Deserialize)]
365pub struct CaepEvaluationResult {
366 pub subject: String,
368
369 pub access_decision: CaepAccessDecision,
371
372 pub risk_score: f32,
374
375 pub triggered_rules: Vec<String>,
377
378 pub required_actions: Vec<CaepRuleAction>,
380
381 pub evaluated_at: DateTime<Utc>,
383
384 pub next_evaluation: DateTime<Utc>,
386}
387
388#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
390#[serde(rename_all = "snake_case")]
391pub enum CaepAccessDecision {
392 Allow,
394
395 AllowWithMonitoring,
397
398 AllowWithStepUp,
400
401 TemporaryDeny,
403
404 Deny,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct CaepSessionState {
411 pub session_id: String,
413
414 pub subject: String,
416
417 pub risk_score: f32,
419
420 pub last_evaluation: Option<CaepEvaluationResult>,
422
423 pub active_events: Vec<CaepEvent>,
425
426 pub created_at: DateTime<Utc>,
428
429 pub last_activity: DateTime<Utc>,
431
432 pub is_quarantined: bool,
434
435 pub quarantine_until: Option<DateTime<Utc>>,
437}
438
439#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct ComprehensiveSessionInfo {
442 pub oidc_session: crate::server::oidc::oidc_session_management::OidcSession,
444
445 pub caep_session: Option<CaepSessionState>,
447
448 pub is_monitored_by_caep: bool,
450}
451
452#[async_trait]
454pub trait CaepEventHandler: Send + Sync {
455 async fn handle_event(&self, event: &CaepEvent) -> Result<()>;
457
458 fn supported_event_types(&self) -> Vec<CaepEventType>;
460}
461
462pub struct CaepManager {
464 config: CaepConfig,
466
467 session_manager: Arc<SessionManager>,
469
470 logout_manager: Arc<BackChannelLogoutManager>,
472
473 step_up_manager: Option<Arc<SteppedUpAuthManager>>,
475
476 sessions: Arc<RwLock<HashMap<String, CaepSessionState>>>,
478
479 event_handlers: EventHandlerMap,
481
482 event_broadcaster: broadcast::Sender<CaepEvent>,
484
485 evaluation_interval: Interval,
487
488 event_history: Arc<RwLock<Vec<CaepEvent>>>,
490
491 rules: Arc<RwLock<Vec<CaepEvaluationRule>>>,
493}
494
495impl CaepManager {
496 pub async fn new(
498 config: CaepConfig,
499 session_manager: Arc<SessionManager>,
500 logout_manager: Arc<BackChannelLogoutManager>,
501 ) -> Result<Self> {
502 let (event_broadcaster, _) = broadcast::channel(1000);
503 let evaluation_interval = interval(config.evaluation_interval.to_std().map_err(|e| {
504 AuthError::Configuration {
505 message: format!("Invalid evaluation interval: {}", e),
506 help: Some("Provide a valid duration for evaluation interval".to_string()),
507 docs_url: Some("https://docs.auth-framework.com/configuration".to_string()),
508 source: None,
509 suggested_fix: Some("Check your configuration and ensure the evaluation interval is properly formatted".to_string()),
510 }
511 })?);
512
513 Ok(Self {
514 config: config.clone(),
515 session_manager,
516 logout_manager,
517 step_up_manager: None,
518 sessions: Arc::new(RwLock::new(HashMap::new())),
519 event_handlers: Arc::new(RwLock::new(HashMap::new())),
520 event_broadcaster,
521 evaluation_interval,
522 event_history: Arc::new(RwLock::new(Vec::new())),
523 rules: Arc::new(RwLock::new(config.evaluation_rules)),
524 })
525 }
526
527 pub fn with_step_up_manager(mut self, step_up_manager: Arc<SteppedUpAuthManager>) -> Self {
529 self.step_up_manager = Some(step_up_manager);
530 self
531 }
532
533 pub async fn register_event_handler(
535 &self,
536 event_type: CaepEventType,
537 handler: Arc<dyn CaepEventHandler>,
538 ) -> Result<()> {
539 let mut handlers = self.event_handlers.write().await;
540 handlers.entry(event_type).or_default().push(handler);
541 Ok(())
542 }
543
544 pub async fn process_event(&self, event: CaepEvent) -> Result<CaepEvaluationResult> {
546 {
548 let mut history = self.event_history.write().await;
549 history.push(event.clone());
550
551 let retention_cutoff = Utc::now() - self.config.event_retention_period;
553 history.retain(|e| e.timestamp >= retention_cutoff);
554 }
555
556 if let Err(e) = self.event_broadcaster.send(event.clone()) {
558 log::warn!("Failed to broadcast CAEP event: {}", e);
559 }
560
561 if let Some(session_id) = &event.session_id {
563 self.update_session_state(session_id, &event).await?;
564 }
565
566 let evaluation_result = self.evaluate_access(&event.subject, Some(&event)).await?;
568
569 self.execute_actions(&evaluation_result).await?;
571
572 self.notify_handlers(&event).await?;
574
575 Ok(evaluation_result)
576 }
577
578 pub async fn evaluate_access(
580 &self,
581 subject: &str,
582 triggering_event: Option<&CaepEvent>,
583 ) -> Result<CaepEvaluationResult> {
584 let rules = self.rules.read().await;
585 let mut triggered_rules = Vec::new();
586 let mut required_actions = Vec::new();
587 let risk_score = if let Some(event) = triggering_event {
588 event.risk_score
589 } else {
590 self.calculate_risk_score(subject).await?
592 };
593
594 for rule in rules.iter() {
596 if !rule.enabled {
597 continue;
598 }
599
600 if let Some(event) = triggering_event
601 && !rule.applicable_events.contains(&event.event_type)
602 {
603 continue;
604 }
605
606 if self
607 .evaluate_rule_conditions(rule, subject, triggering_event, risk_score)
608 .await?
609 {
610 triggered_rules.push(rule.id.clone());
611 required_actions.extend(rule.actions.clone());
612 }
613 }
614
615 let access_decision = self.determine_access_decision(risk_score, &required_actions);
617
618 let now = Utc::now();
619 Ok(CaepEvaluationResult {
620 subject: subject.to_string(),
621 access_decision,
622 risk_score,
623 triggered_rules,
624 required_actions,
625 evaluated_at: now,
626 next_evaluation: now + self.config.evaluation_interval,
627 })
628 }
629
630 pub async fn start_continuous_evaluation(&mut self) -> Result<()> {
632 loop {
633 self.evaluation_interval.tick().await;
634
635 self.synchronize_with_session_manager().await?;
637
638 let sessions = {
640 let sessions_guard = self.sessions.read().await;
641 sessions_guard.keys().cloned().collect::<Vec<_>>()
642 };
643
644 for session_id in sessions {
645 if let Some(session_state) = self.sessions.read().await.get(&session_id) {
646 let evaluation = self.evaluate_access(&session_state.subject, None).await?;
647 self.execute_actions(&evaluation).await?;
648 }
649 }
650 }
651 }
652
653 async fn synchronize_with_session_manager(&self) -> Result<()> {
655 let mut sessions = self.sessions.write().await;
656 let mut sessions_to_remove = Vec::new();
657
658 for (session_id, caep_session) in sessions.iter() {
659 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
661 if !self.session_manager.is_session_valid(session_id) {
662 log::info!("CAEP removing expired session: {}", session_id);
663 sessions_to_remove.push(session_id.clone());
664 }
665 else if oidc_session.sub != caep_session.subject {
667 log::warn!("CAEP session subject mismatch, removing: {}", session_id);
668 sessions_to_remove.push(session_id.clone());
669 }
670 } else {
671 log::info!("CAEP removing orphaned session: {}", session_id);
672 sessions_to_remove.push(session_id.clone());
673 }
674 }
675
676 for session_id in sessions_to_remove {
678 sessions.remove(&session_id);
679 }
680
681 Ok(())
682 }
683
684 pub async fn revoke_subject_access(&self, subject: &str) -> Result<()> {
686 log::info!("CAEP revoking access for subject: {}", subject);
687
688 let sessions_to_logout = {
690 let sessions = self.sessions.read().await;
691 sessions
692 .iter()
693 .filter(|(_, session)| session.subject == subject)
694 .map(|(session_id, session)| (session_id.clone(), session.clone()))
695 .collect::<Vec<_>>()
696 };
697
698 for (session_id, _) in &sessions_to_logout {
700 let logout_request =
702 crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest {
703 session_id: session_id.clone(),
704 sub: subject.to_string(),
705 sid: Some(session_id.clone()),
706 iss: "caep-manager".to_string(), initiating_client_id: None, additional_events: Some({
709 let mut events = HashMap::new();
710 events.insert(
711 "caep_reason".to_string(),
712 serde_json::json!("automatic_revocation"),
713 );
714 events.insert(
715 "timestamp".to_string(),
716 serde_json::json!(Utc::now().timestamp()),
717 );
718 events
719 }),
720 };
721
722 match self.process_backchannel_logout(&logout_request).await {
725 Ok(_) => {
726 log::info!(
727 "Successfully initiated back-channel logout for session {} (subject: {})",
728 session_id,
729 subject
730 );
731 }
732 Err(e) => {
733 log::error!(
734 "Failed to initiate back-channel logout for session {} (subject: {}): {}",
735 session_id,
736 subject,
737 e
738 );
739 }
740 }
741 }
742
743 let mut sessions = self.sessions.write().await;
745 sessions.retain(|_, session| session.subject != subject);
746
747 Ok(())
748 }
749
750 async fn process_backchannel_logout(
752 &self,
753 logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
754 ) -> Result<()> {
755 let logout_metadata = self.logout_manager.get_discovery_metadata();
760 log::info!("Logout manager capabilities: {:?}", logout_metadata);
761
762 let logout_token = self
764 .create_logout_token_for_caep_revocation(logout_request)
765 .await?;
766
767 self.handle_caep_logout(logout_request, &logout_token)
769 .await?;
770
771 log::info!("CAEP backchannel logout processed successfully");
772 Ok(())
773 }
774
775 async fn create_logout_token_for_caep_revocation(
777 &self,
778 logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
779 ) -> Result<String> {
780 use jsonwebtoken::{Algorithm, EncodingKey, Header, encode};
781 use serde_json::json;
782
783 let claims = json!({
785 "iss": logout_request.iss,
786 "sub": logout_request.sub,
787 "aud": ["caep-manager"],
788 "exp": (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp(),
789 "iat": chrono::Utc::now().timestamp(),
790 "jti": uuid::Uuid::new_v4().to_string(),
791 "sid": logout_request.sid,
792 "events": {
793 "http://schemas.openid.net/secevent/caep/event-type/session-revoked": {}
794 },
795 "caep_reason": logout_request.additional_events
796 .as_ref()
797 .and_then(|events| events.get("caep_reason"))
798 .unwrap_or(&serde_json::json!("automatic_revocation"))
799 });
800
801 let key = EncodingKey::from_secret("caep-secret".as_ref());
803 let header = Header::new(Algorithm::HS256);
804
805 let token = encode(&header, &claims, &key).map_err(|e| {
806 AuthError::auth_method("caep", format!("Failed to create logout token: {}", e))
807 })?;
808
809 Ok(token)
810 }
811
812 async fn handle_caep_logout(
814 &self,
815 logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
816 logout_token: &str,
817 ) -> Result<()> {
818 log::info!(
826 "Processing CAEP logout for session: {}",
827 logout_request.session_id
828 );
829
830 {
832 let mut sessions = self.sessions.write().await;
833 if let Some(_session) = sessions.get(&logout_request.session_id) {
834 sessions.remove(&logout_request.session_id);
836 }
837 }
838
839 let caep_event = CaepEvent {
841 id: uuid::Uuid::new_v4(),
842 event_type: CaepEventType::UserLogout, subject: logout_request.sub.clone(),
844 session_id: Some(logout_request.session_id.clone()),
845 timestamp: chrono::Utc::now(),
846 severity: CaepEventSeverity::High,
847 source: CaepEventSource {
848 system_id: "caep-manager".to_string(),
849 source_type: "caep_automatic_revocation".to_string(),
850 version: Some("1.0".to_string()),
851 metadata: std::collections::HashMap::new(),
852 },
853 risk_score: 1.0, location: None,
855 device_info: None,
856 event_data: serde_json::json!({
857 "logout_token": logout_token,
858 "initiator": "caep_automatic_revocation",
859 "reason": logout_request.additional_events
860 .as_ref()
861 .and_then(|events| events.get("caep_reason"))
862 .cloned()
863 .unwrap_or_else(|| serde_json::json!("automatic_revocation"))
864 }),
865 correlation_id: Some(uuid::Uuid::new_v4()),
866 };
867
868 if let Err(e) = self.event_broadcaster.send(caep_event) {
870 log::warn!("Failed to broadcast CAEP logout event: {}", e);
871 }
872
873 log::info!(
874 "CAEP logout completed for session: {}",
875 logout_request.session_id
876 );
877 Ok(())
878 }
879
880 async fn calculate_risk_score(&self, subject: &str) -> Result<f32> {
882 let history = self.event_history.read().await;
883 let recent_cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
884
885 let recent_events: Vec<_> = history
886 .iter()
887 .filter(|e| e.subject == subject && e.timestamp >= recent_cutoff)
888 .collect();
889
890 if recent_events.is_empty() {
891 return Ok(0.0);
892 }
893
894 let mut total_risk = 0.0;
896 let mut total_weight = 0.0;
897
898 for event in recent_events {
899 let weight = match event.severity {
900 CaepEventSeverity::Low => 1.0,
901 CaepEventSeverity::Medium => 2.0,
902 CaepEventSeverity::High => 4.0,
903 CaepEventSeverity::Critical => 8.0,
904 };
905
906 total_risk += event.risk_score * weight;
907 total_weight += weight;
908 }
909
910 Ok(if total_weight > 0.0 {
911 (total_risk / total_weight).min(1.0)
912 } else {
913 0.0
914 })
915 }
916
917 async fn update_session_state(&self, session_id: &str, event: &CaepEvent) -> Result<()> {
919 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
921 if !self.session_manager.is_session_valid(session_id) {
923 log::warn!(
924 "CAEP received event for expired OIDC session: {}",
925 session_id
926 );
927 let mut sessions = self.sessions.write().await;
929 sessions.remove(session_id);
930 return Ok(());
931 }
932
933 if oidc_session.sub != event.subject {
935 return Err(AuthError::validation(
936 "Subject mismatch between CAEP event and OIDC session",
937 ));
938 }
939 } else {
940 log::warn!(
941 "CAEP received event for unknown OIDC session: {}",
942 session_id
943 );
944 return Err(AuthError::validation("Session not found in SessionManager"));
945 }
946
947 let mut sessions = self.sessions.write().await;
949
950 let session_state =
951 sessions
952 .entry(session_id.to_string())
953 .or_insert_with(|| CaepSessionState {
954 session_id: session_id.to_string(),
955 subject: event.subject.clone(),
956 risk_score: event.risk_score,
957 last_evaluation: None,
958 active_events: Vec::new(),
959 created_at: Utc::now(),
960 last_activity: Utc::now(),
961 is_quarantined: false,
962 quarantine_until: None,
963 });
964
965 session_state.risk_score = event.risk_score;
966 session_state.last_activity = Utc::now();
967 session_state.active_events.push(event.clone());
968
969 let cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
971 session_state
972 .active_events
973 .retain(|e| e.timestamp >= cutoff);
974
975 Ok(())
976 }
977
978 async fn evaluate_rule_conditions(
980 &self,
981 rule: &CaepEvaluationRule,
982 _subject: &str,
983 event: Option<&CaepEvent>,
984 risk_score: f32,
985 ) -> Result<bool> {
986 for condition in &rule.conditions {
987 match condition {
988 CaepRuleCondition::RiskScoreAbove { threshold } => {
989 if risk_score <= *threshold {
990 return Ok(false);
991 }
992 }
993 CaepRuleCondition::SeverityAtLeast { severity } => {
994 if let Some(event) = event {
995 let event_severity_level = match event.severity {
996 CaepEventSeverity::Critical => 4,
997 CaepEventSeverity::High => 3,
998 CaepEventSeverity::Medium => 2,
999 CaepEventSeverity::Low => 1,
1000 };
1001
1002 let required_severity_level = match severity {
1003 CaepEventSeverity::Critical => 4,
1004 CaepEventSeverity::High => 3,
1005 CaepEventSeverity::Medium => 2,
1006 CaepEventSeverity::Low => 1,
1007 };
1008
1009 if event_severity_level < required_severity_level {
1010 return Ok(false);
1011 }
1012 } else {
1013 return Ok(false);
1014 }
1015 }
1016 CaepRuleCondition::LocationChange { suspicious_only } => {
1017 if let Some(event) = event {
1018 if let Some(location) = &event.location {
1019 if *suspicious_only && !location.is_suspicious {
1020 return Ok(false);
1021 }
1022 } else {
1023 return Ok(false);
1024 }
1025 } else {
1026 return Ok(false);
1027 }
1028 }
1029 CaepRuleCondition::UnknownDevice { require_trusted } => {
1030 if let Some(event) = event
1031 && let Some(device) = &event.device_info
1032 && *require_trusted
1033 && device.is_trusted
1034 {
1035 return Ok(false);
1036 }
1037 }
1038 CaepRuleCondition::OutsideBusinessHours { timezone: _ } => {
1039 let hour = Utc::now().hour();
1041 if (9..17).contains(&hour) {
1042 return Ok(false);
1043 }
1044 }
1045 CaepRuleCondition::Custom { expression: _ } => {
1046 }
1049 }
1050 }
1051
1052 Ok(true)
1053 }
1054
1055 fn determine_access_decision(
1057 &self,
1058 risk_score: f32,
1059 actions: &[CaepRuleAction],
1060 ) -> CaepAccessDecision {
1061 for action in actions {
1062 match action {
1063 CaepRuleAction::RevokeAccess { immediate: true } => {
1064 return CaepAccessDecision::Deny;
1065 }
1066 CaepRuleAction::RevokeAccess { immediate: false } => {
1067 return CaepAccessDecision::TemporaryDeny;
1068 }
1069 CaepRuleAction::RequireStepUp { .. } => {
1070 return CaepAccessDecision::AllowWithStepUp;
1071 }
1072 CaepRuleAction::QuarantineSession { .. } => {
1073 return CaepAccessDecision::TemporaryDeny;
1074 }
1075 _ => {}
1076 }
1077 }
1078
1079 if risk_score >= self.config.auto_revoke_threshold {
1080 CaepAccessDecision::Deny
1081 } else if risk_score >= 0.6 {
1082 CaepAccessDecision::AllowWithMonitoring
1083 } else {
1084 CaepAccessDecision::Allow
1085 }
1086 }
1087
1088 async fn execute_actions(&self, evaluation: &CaepEvaluationResult) -> Result<()> {
1090 for action in &evaluation.required_actions {
1091 match action {
1092 CaepRuleAction::RevokeAccess { .. } => {
1093 self.revoke_subject_access(&evaluation.subject).await?;
1094 }
1095 CaepRuleAction::RequireStepUp { level } => {
1096 if let Some(_step_up_manager) = &self.step_up_manager {
1097 log::info!(
1099 "CAEP requiring step-up to level {} for subject {}",
1100 level,
1101 evaluation.subject
1102 );
1103 }
1104 }
1105 CaepRuleAction::SendNotification { channels } => {
1106 log::info!(
1107 "CAEP sending notification via channels {:?} for subject {}",
1108 channels,
1109 evaluation.subject
1110 );
1111 }
1112 CaepRuleAction::LogEvent { level } => {
1113 log::info!(
1114 "CAEP logging event at level {} for subject {}",
1115 level,
1116 evaluation.subject
1117 );
1118 }
1119 CaepRuleAction::TriggerWebhook { url } => {
1120 log::info!(
1121 "CAEP triggering webhook {} for subject {}",
1122 url,
1123 evaluation.subject
1124 );
1125 }
1126 CaepRuleAction::QuarantineSession { duration_minutes } => {
1127 self.quarantine_session(&evaluation.subject, *duration_minutes)
1128 .await?;
1129 }
1130 }
1131 }
1132
1133 Ok(())
1134 }
1135
1136 async fn quarantine_session(&self, subject: &str, duration_minutes: u32) -> Result<()> {
1138 let mut sessions = self.sessions.write().await;
1139 let quarantine_until =
1140 Utc::now() + Duration::try_minutes(duration_minutes as i64).unwrap_or(Duration::zero());
1141
1142 let mut quarantined_session_ids = Vec::new();
1144 for session in sessions.values_mut() {
1145 if session.subject == subject {
1146 session.is_quarantined = true;
1147 session.quarantine_until = Some(quarantine_until);
1148 quarantined_session_ids.push(session.session_id.clone());
1149 }
1150 }
1151
1152 log::info!(
1153 "CAEP quarantined {} sessions for subject {} until {}. Session IDs: {:?}",
1154 quarantined_session_ids.len(),
1155 subject,
1156 quarantine_until,
1157 quarantined_session_ids
1158 );
1159
1160 Ok(())
1164 }
1165
1166 async fn notify_handlers(&self, event: &CaepEvent) -> Result<()> {
1168 let handlers = self.event_handlers.read().await;
1169
1170 if let Some(event_handlers) = handlers.get(&event.event_type) {
1171 for handler in event_handlers {
1172 if let Err(e) = handler.handle_event(event).await {
1173 log::error!("CAEP event handler failed: {}", e);
1174 }
1175 }
1176 }
1177
1178 Ok(())
1179 }
1180
1181 pub async fn get_session_state(&self, session_id: &str) -> Result<Option<CaepSessionState>> {
1183 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1185 if !self.session_manager.is_session_valid(session_id) {
1186 let mut sessions = self.sessions.write().await;
1188 sessions.remove(session_id);
1189 return Ok(None);
1190 }
1191
1192 let sessions = self.sessions.read().await;
1194 if let Some(caep_session) = sessions.get(session_id) {
1195 if caep_session.subject == oidc_session.sub {
1196 return Ok(Some(caep_session.clone()));
1197 } else {
1198 log::warn!(
1199 "Subject mismatch between CAEP and OIDC sessions for {}",
1200 session_id
1201 );
1202 return Ok(None);
1203 }
1204 }
1205 }
1206
1207 Ok(None)
1209 }
1210
1211 pub async fn get_event_history(
1213 &self,
1214 subject: &str,
1215 limit: Option<usize>,
1216 ) -> Result<Vec<CaepEvent>> {
1217 let history = self.event_history.read().await;
1218 let mut events: Vec<_> = history
1219 .iter()
1220 .filter(|e| e.subject == subject)
1221 .cloned()
1222 .collect();
1223
1224 events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
1225
1226 if let Some(limit) = limit {
1227 events.truncate(limit);
1228 }
1229
1230 Ok(events)
1231 }
1232
1233 pub async fn add_evaluation_rule(&self, rule: CaepEvaluationRule) -> Result<()> {
1235 let mut rules = self.rules.write().await;
1236
1237 rules.retain(|r| r.id != rule.id);
1239
1240 rules.push(rule);
1242 rules.sort_by(|a, b| b.priority.cmp(&a.priority));
1243
1244 Ok(())
1245 }
1246
1247 pub async fn remove_evaluation_rule(&self, rule_id: &str) -> Result<bool> {
1249 let mut rules = self.rules.write().await;
1250 let original_len = rules.len();
1251 rules.retain(|r| r.id != rule_id);
1252 Ok(rules.len() < original_len)
1253 }
1254
1255 pub async fn get_comprehensive_session_info(
1257 &self,
1258 session_id: &str,
1259 ) -> Result<Option<ComprehensiveSessionInfo>> {
1260 if let Some(oidc_session) = self.session_manager.get_session(session_id) {
1262 if !self.session_manager.is_session_valid(session_id) {
1263 return Ok(None);
1264 }
1265
1266 let caep_session = {
1268 let sessions = self.sessions.read().await;
1269 sessions.get(session_id).cloned()
1270 };
1271
1272 let comprehensive_info = ComprehensiveSessionInfo {
1273 oidc_session: oidc_session.clone(),
1274 is_monitored_by_caep: caep_session.is_some(),
1275 caep_session,
1276 };
1277
1278 Ok(Some(comprehensive_info))
1279 } else {
1280 Ok(None)
1281 }
1282 }
1283
1284 pub async fn get_subject_sessions(
1286 &self,
1287 subject: &str,
1288 ) -> Result<Vec<ComprehensiveSessionInfo>> {
1289 let oidc_sessions = self.session_manager.get_sessions_for_subject(subject);
1290 let mut comprehensive_sessions = Vec::new();
1291
1292 for oidc_session in oidc_sessions {
1293 if self
1294 .session_manager
1295 .is_session_valid(&oidc_session.session_id)
1296 {
1297 let caep_session = {
1298 let sessions = self.sessions.read().await;
1299 sessions.get(&oidc_session.session_id).cloned()
1300 };
1301
1302 comprehensive_sessions.push(ComprehensiveSessionInfo {
1303 oidc_session: oidc_session.clone(),
1304 is_monitored_by_caep: caep_session.is_some(),
1305 caep_session,
1306 });
1307 }
1308 }
1309
1310 Ok(comprehensive_sessions)
1311 }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316 use super::*;
1317 use tokio;
1318
1319 #[tokio::test]
1320 async fn test_caep_event_creation() {
1321 let event = CaepEvent {
1322 id: Uuid::new_v4(),
1323 event_type: CaepEventType::RiskScoreChange,
1324 subject: "user123".to_string(),
1325 severity: CaepEventSeverity::High,
1326 timestamp: Utc::now(),
1327 source: CaepEventSource {
1328 system_id: "risk_engine".to_string(),
1329 source_type: "ml_model".to_string(),
1330 version: Some("1.0.0".to_string()),
1331 metadata: HashMap::new(),
1332 },
1333 risk_score: 0.85,
1334 session_id: Some("session123".to_string()),
1335 location: None,
1336 device_info: None,
1337 event_data: serde_json::json!({
1338 "previous_score": 0.3,
1339 "new_score": 0.85,
1340 "trigger": "suspicious_login_pattern"
1341 }),
1342 correlation_id: None,
1343 };
1344
1345 assert_eq!(event.subject, "user123");
1346 assert_eq!(event.risk_score, 0.85);
1347 assert!(matches!(event.severity, CaepEventSeverity::High));
1348 }
1349
1350 #[tokio::test]
1351 async fn test_caep_config_creation() {
1352 let config = CaepConfig::default();
1353 assert!(!config.event_stream_url.is_empty());
1354 assert!(config.auto_revoke);
1355 assert_eq!(config.auto_revoke_threshold, 0.8);
1356 }
1357
1358 #[tokio::test]
1359 async fn test_severity_comparison() {
1360 let high_level = match CaepEventSeverity::High {
1362 CaepEventSeverity::Critical => 4,
1363 CaepEventSeverity::High => 3,
1364 CaepEventSeverity::Medium => 2,
1365 CaepEventSeverity::Low => 1,
1366 };
1367
1368 let medium_level = match CaepEventSeverity::Medium {
1369 CaepEventSeverity::Critical => 4,
1370 CaepEventSeverity::High => 3,
1371 CaepEventSeverity::Medium => 2,
1372 CaepEventSeverity::Low => 1,
1373 };
1374
1375 assert!(high_level > medium_level);
1376 }
1377}