use crate::errors::{AuthError, Result};
use crate::server::core::stepped_up_auth::SteppedUpAuthManager;
use crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutManager;
use crate::server::oidc::oidc_session_management::SessionManager;
use async_trait::async_trait;
use chrono::{DateTime, Duration, Timelike, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use tokio::time::{Interval, interval};
use uuid::Uuid;
type EventHandlerMap = Arc<RwLock<HashMap<CaepEventType, Vec<Arc<dyn CaepEventHandler>>>>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepConfig {
pub event_stream_url: String,
pub evaluation_interval: Duration,
pub auto_revoke: bool,
pub auto_revoke_threshold: f32,
pub max_concurrent_processors: usize,
pub event_retention_period: Duration,
pub propagation_endpoints: Vec<String>,
pub evaluation_rules: Vec<CaepEvaluationRule>,
}
impl Default for CaepConfig {
fn default() -> Self {
Self {
event_stream_url: "wss://localhost:8080/caep/events".to_string(),
evaluation_interval: Duration::try_seconds(30).unwrap_or(Duration::zero()),
auto_revoke: true,
auto_revoke_threshold: 0.8,
max_concurrent_processors: 10,
event_retention_period: Duration::try_hours(24).unwrap_or(Duration::zero()),
propagation_endpoints: Vec::new(),
evaluation_rules: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CaepEventType {
UserLogin,
UserLogout,
UserProfileChange,
UserCredentialChange,
SessionCreated,
SessionModified,
SessionTimeout,
SessionSuspiciousActivity,
RiskScoreChange,
LocationChange,
DeviceChange,
BehavioralAnomaly,
PolicyUpdate,
ComplianceViolation,
AccessPatternAnomaly,
SystemOutage,
SecurityIncident,
DataBreach,
Custom(String),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CaepEventSeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepEventSource {
pub system_id: String,
pub source_type: String,
pub version: Option<String>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepEvent {
pub id: Uuid,
pub event_type: CaepEventType,
pub subject: String,
pub severity: CaepEventSeverity,
pub timestamp: DateTime<Utc>,
pub source: CaepEventSource,
pub risk_score: f32,
pub session_id: Option<String>,
pub location: Option<CaepLocationInfo>,
pub device_info: Option<CaepDeviceInfo>,
pub event_data: serde_json::Value,
pub correlation_id: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepLocationInfo {
pub country: Option<String>,
pub region: Option<String>,
pub city: Option<String>,
pub latitude: Option<f64>,
pub longitude: Option<f64>,
pub ip_address: Option<String>,
pub is_suspicious: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepDeviceInfo {
pub device_id: Option<String>,
pub device_type: Option<String>,
pub os: Option<String>,
pub client: Option<String>,
pub is_trusted: bool,
pub requires_binding: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepEvaluationRule {
pub id: String,
pub description: String,
pub applicable_events: Vec<CaepEventType>,
pub conditions: Vec<CaepRuleCondition>,
pub actions: Vec<CaepRuleAction>,
pub priority: i32,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CaepRuleCondition {
RiskScoreAbove { threshold: f32 },
SeverityAtLeast { severity: CaepEventSeverity },
LocationChange { suspicious_only: bool },
UnknownDevice { require_trusted: bool },
OutsideBusinessHours { timezone: String },
Custom { expression: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum CaepRuleAction {
RevokeAccess { immediate: bool },
RequireStepUp { level: String },
SendNotification { channels: Vec<String> },
LogEvent { level: String },
TriggerWebhook { url: String },
QuarantineSession { duration_minutes: u32 },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepEvaluationResult {
pub subject: String,
pub access_decision: CaepAccessDecision,
pub risk_score: f32,
pub triggered_rules: Vec<String>,
pub required_actions: Vec<CaepRuleAction>,
pub evaluated_at: DateTime<Utc>,
pub next_evaluation: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CaepAccessDecision {
Allow,
AllowWithMonitoring,
AllowWithStepUp,
TemporaryDeny,
Deny,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CaepSessionState {
pub session_id: String,
pub subject: String,
pub risk_score: f32,
pub last_evaluation: Option<CaepEvaluationResult>,
pub active_events: Vec<CaepEvent>,
pub created_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub is_quarantined: bool,
pub quarantine_until: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComprehensiveSessionInfo {
pub oidc_session: crate::server::oidc::oidc_session_management::OidcSession,
pub caep_session: Option<CaepSessionState>,
pub is_monitored_by_caep: bool,
}
#[async_trait]
pub trait CaepEventHandler: Send + Sync {
async fn handle_event(&self, event: &CaepEvent) -> Result<()>;
fn supported_event_types(&self) -> Vec<CaepEventType>;
}
pub struct CaepManager {
config: CaepConfig,
session_manager: Arc<SessionManager>,
logout_manager: Arc<BackChannelLogoutManager>,
step_up_manager: Option<Arc<SteppedUpAuthManager>>,
sessions: Arc<RwLock<HashMap<String, CaepSessionState>>>,
event_handlers: EventHandlerMap,
event_broadcaster: broadcast::Sender<CaepEvent>,
evaluation_interval: Interval,
event_history: Arc<RwLock<Vec<CaepEvent>>>,
rules: Arc<RwLock<Vec<CaepEvaluationRule>>>,
}
impl CaepManager {
pub async fn new(
config: CaepConfig,
session_manager: Arc<SessionManager>,
logout_manager: Arc<BackChannelLogoutManager>,
) -> Result<Self> {
let (event_broadcaster, _) = broadcast::channel(1000);
let evaluation_interval = interval(config.evaluation_interval.to_std().map_err(|e| {
AuthError::Configuration {
message: format!("Invalid evaluation interval: {}", e),
help: Some("Provide a valid duration for evaluation interval".to_string()),
docs_url: Some("https://docs.auth-framework.com/configuration".to_string()),
source: None,
suggested_fix: Some("Check your configuration and ensure the evaluation interval is properly formatted".to_string()),
}
})?);
Ok(Self {
config: config.clone(),
session_manager,
logout_manager,
step_up_manager: None,
sessions: Arc::new(RwLock::new(HashMap::new())),
event_handlers: Arc::new(RwLock::new(HashMap::new())),
event_broadcaster,
evaluation_interval,
event_history: Arc::new(RwLock::new(Vec::new())),
rules: Arc::new(RwLock::new(config.evaluation_rules)),
})
}
pub fn with_step_up_manager(mut self, step_up_manager: Arc<SteppedUpAuthManager>) -> Self {
self.step_up_manager = Some(step_up_manager);
self
}
pub async fn register_event_handler(
&self,
event_type: CaepEventType,
handler: Arc<dyn CaepEventHandler>,
) -> Result<()> {
let mut handlers = self.event_handlers.write().await;
handlers.entry(event_type).or_default().push(handler);
Ok(())
}
pub async fn process_event(&self, event: CaepEvent) -> Result<CaepEvaluationResult> {
{
let mut history = self.event_history.write().await;
history.push(event.clone());
let retention_cutoff = Utc::now() - self.config.event_retention_period;
history.retain(|e| e.timestamp >= retention_cutoff);
}
if let Err(e) = self.event_broadcaster.send(event.clone()) {
log::warn!("Failed to broadcast CAEP event: {}", e);
}
if let Some(session_id) = &event.session_id {
self.update_session_state(session_id, &event).await?;
}
let evaluation_result = self.evaluate_access(&event.subject, Some(&event)).await?;
self.execute_actions(&evaluation_result).await?;
self.notify_handlers(&event).await?;
Ok(evaluation_result)
}
pub async fn evaluate_access(
&self,
subject: &str,
triggering_event: Option<&CaepEvent>,
) -> Result<CaepEvaluationResult> {
let rules = self.rules.read().await;
let mut triggered_rules = Vec::new();
let mut required_actions = Vec::new();
let risk_score = if let Some(event) = triggering_event {
event.risk_score
} else {
self.calculate_risk_score(subject).await?
};
for rule in rules.iter() {
if !rule.enabled {
continue;
}
if let Some(event) = triggering_event
&& !rule.applicable_events.contains(&event.event_type)
{
continue;
}
if self
.evaluate_rule_conditions(rule, subject, triggering_event, risk_score)
.await?
{
triggered_rules.push(rule.id.clone());
required_actions.extend(rule.actions.clone());
}
}
let access_decision = self.determine_access_decision(risk_score, &required_actions);
let now = Utc::now();
Ok(CaepEvaluationResult {
subject: subject.to_string(),
access_decision,
risk_score,
triggered_rules,
required_actions,
evaluated_at: now,
next_evaluation: now + self.config.evaluation_interval,
})
}
pub async fn start_continuous_evaluation(&mut self) -> Result<()> {
loop {
self.evaluation_interval.tick().await;
self.synchronize_with_session_manager().await?;
let sessions = {
let sessions_guard = self.sessions.read().await;
sessions_guard.keys().cloned().collect::<Vec<_>>()
};
for session_id in sessions {
if let Some(session_state) = self.sessions.read().await.get(&session_id) {
let evaluation = self.evaluate_access(&session_state.subject, None).await?;
self.execute_actions(&evaluation).await?;
}
}
}
}
async fn synchronize_with_session_manager(&self) -> Result<()> {
let mut sessions = self.sessions.write().await;
let mut sessions_to_remove = Vec::new();
for (session_id, caep_session) in sessions.iter() {
if let Some(oidc_session) = self.session_manager.get_session(session_id) {
if !self.session_manager.is_session_valid(session_id) {
log::info!("CAEP removing expired session: {}", session_id);
sessions_to_remove.push(session_id.clone());
}
else if oidc_session.sub != caep_session.subject {
log::warn!("CAEP session subject mismatch, removing: {}", session_id);
sessions_to_remove.push(session_id.clone());
}
} else {
log::info!("CAEP removing orphaned session: {}", session_id);
sessions_to_remove.push(session_id.clone());
}
}
for session_id in sessions_to_remove {
sessions.remove(&session_id);
}
Ok(())
}
pub async fn revoke_subject_access(&self, subject: &str) -> Result<()> {
log::info!("CAEP revoking access for subject: {}", subject);
let sessions_to_logout = {
let sessions = self.sessions.read().await;
sessions
.iter()
.filter(|(_, session)| session.subject == subject)
.map(|(session_id, session)| (session_id.clone(), session.clone()))
.collect::<Vec<_>>()
};
for (session_id, _) in &sessions_to_logout {
let logout_request =
crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest {
session_id: session_id.clone(),
sub: subject.to_string(),
sid: Some(session_id.clone()),
iss: "caep-manager".to_string(), initiating_client_id: None, additional_events: Some({
let mut events = HashMap::new();
events.insert(
"caep_reason".to_string(),
serde_json::json!("automatic_revocation"),
);
events.insert(
"timestamp".to_string(),
serde_json::json!(Utc::now().timestamp()),
);
events
}),
};
match self.process_backchannel_logout(&logout_request).await {
Ok(_) => {
log::info!(
"Successfully initiated back-channel logout for session {} (subject: {})",
session_id,
subject
);
}
Err(e) => {
log::error!(
"Failed to initiate back-channel logout for session {} (subject: {}): {}",
session_id,
subject,
e
);
}
}
}
let mut sessions = self.sessions.write().await;
sessions.retain(|_, session| session.subject != subject);
Ok(())
}
async fn process_backchannel_logout(
&self,
logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
) -> Result<()> {
let logout_metadata = self.logout_manager.get_discovery_metadata();
log::info!("Logout manager capabilities: {:?}", logout_metadata);
let logout_token = self
.create_logout_token_for_caep_revocation(logout_request)
.await?;
self.handle_caep_logout(logout_request, &logout_token)
.await?;
log::info!("CAEP backchannel logout processed successfully");
Ok(())
}
async fn create_logout_token_for_caep_revocation(
&self,
logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
) -> Result<String> {
use jsonwebtoken::{Algorithm, EncodingKey, Header, encode};
use serde_json::json;
let claims = json!({
"iss": logout_request.iss,
"sub": logout_request.sub,
"aud": ["caep-manager"],
"exp": (chrono::Utc::now() + chrono::Duration::minutes(5)).timestamp(),
"iat": chrono::Utc::now().timestamp(),
"jti": uuid::Uuid::new_v4().to_string(),
"sid": logout_request.sid,
"events": {
"http://schemas.openid.net/secevent/caep/event-type/session-revoked": {}
},
"caep_reason": logout_request.additional_events
.as_ref()
.and_then(|events| events.get("caep_reason"))
.unwrap_or(&serde_json::json!("automatic_revocation"))
});
let key = EncodingKey::from_secret("caep-secret".as_ref());
let header = Header::new(Algorithm::HS256);
let token = encode(&header, &claims, &key).map_err(|e| {
AuthError::auth_method("caep", format!("Failed to create logout token: {}", e))
})?;
Ok(token)
}
async fn handle_caep_logout(
&self,
logout_request: &crate::server::oidc::oidc_backchannel_logout::BackChannelLogoutRequest,
logout_token: &str,
) -> Result<()> {
log::info!(
"Processing CAEP logout for session: {}",
logout_request.session_id
);
{
let mut sessions = self.sessions.write().await;
if let Some(_session) = sessions.get(&logout_request.session_id) {
sessions.remove(&logout_request.session_id);
}
}
let caep_event = CaepEvent {
id: uuid::Uuid::new_v4(),
event_type: CaepEventType::UserLogout, subject: logout_request.sub.clone(),
session_id: Some(logout_request.session_id.clone()),
timestamp: chrono::Utc::now(),
severity: CaepEventSeverity::High,
source: CaepEventSource {
system_id: "caep-manager".to_string(),
source_type: "caep_automatic_revocation".to_string(),
version: Some("1.0".to_string()),
metadata: std::collections::HashMap::new(),
},
risk_score: 1.0, location: None,
device_info: None,
event_data: serde_json::json!({
"logout_token": logout_token,
"initiator": "caep_automatic_revocation",
"reason": logout_request.additional_events
.as_ref()
.and_then(|events| events.get("caep_reason"))
.cloned()
.unwrap_or_else(|| serde_json::json!("automatic_revocation"))
}),
correlation_id: Some(uuid::Uuid::new_v4()),
};
if let Err(e) = self.event_broadcaster.send(caep_event) {
log::warn!("Failed to broadcast CAEP logout event: {}", e);
}
log::info!(
"CAEP logout completed for session: {}",
logout_request.session_id
);
Ok(())
}
async fn calculate_risk_score(&self, subject: &str) -> Result<f32> {
let history = self.event_history.read().await;
let recent_cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
let recent_events: Vec<_> = history
.iter()
.filter(|e| e.subject == subject && e.timestamp >= recent_cutoff)
.collect();
if recent_events.is_empty() {
return Ok(0.0);
}
let mut total_risk = 0.0;
let mut total_weight = 0.0;
for event in recent_events {
let weight = match event.severity {
CaepEventSeverity::Low => 1.0,
CaepEventSeverity::Medium => 2.0,
CaepEventSeverity::High => 4.0,
CaepEventSeverity::Critical => 8.0,
};
total_risk += event.risk_score * weight;
total_weight += weight;
}
Ok(if total_weight > 0.0 {
(total_risk / total_weight).min(1.0)
} else {
0.0
})
}
async fn update_session_state(&self, session_id: &str, event: &CaepEvent) -> Result<()> {
if let Some(oidc_session) = self.session_manager.get_session(session_id) {
if !self.session_manager.is_session_valid(session_id) {
log::warn!(
"CAEP received event for expired OIDC session: {}",
session_id
);
let mut sessions = self.sessions.write().await;
sessions.remove(session_id);
return Ok(());
}
if oidc_session.sub != event.subject {
return Err(AuthError::validation(
"Subject mismatch between CAEP event and OIDC session",
));
}
} else {
log::warn!(
"CAEP received event for unknown OIDC session: {}",
session_id
);
return Err(AuthError::validation("Session not found in SessionManager"));
}
let mut sessions = self.sessions.write().await;
let session_state =
sessions
.entry(session_id.to_string())
.or_insert_with(|| CaepSessionState {
session_id: session_id.to_string(),
subject: event.subject.clone(),
risk_score: event.risk_score,
last_evaluation: None,
active_events: Vec::new(),
created_at: Utc::now(),
last_activity: Utc::now(),
is_quarantined: false,
quarantine_until: None,
});
session_state.risk_score = event.risk_score;
session_state.last_activity = Utc::now();
session_state.active_events.push(event.clone());
let cutoff = Utc::now() - Duration::try_hours(1).unwrap_or(Duration::zero());
session_state
.active_events
.retain(|e| e.timestamp >= cutoff);
Ok(())
}
async fn evaluate_rule_conditions(
&self,
rule: &CaepEvaluationRule,
_subject: &str,
event: Option<&CaepEvent>,
risk_score: f32,
) -> Result<bool> {
for condition in &rule.conditions {
match condition {
CaepRuleCondition::RiskScoreAbove { threshold } => {
if risk_score <= *threshold {
return Ok(false);
}
}
CaepRuleCondition::SeverityAtLeast { severity } => {
if let Some(event) = event {
let event_severity_level = match event.severity {
CaepEventSeverity::Critical => 4,
CaepEventSeverity::High => 3,
CaepEventSeverity::Medium => 2,
CaepEventSeverity::Low => 1,
};
let required_severity_level = match severity {
CaepEventSeverity::Critical => 4,
CaepEventSeverity::High => 3,
CaepEventSeverity::Medium => 2,
CaepEventSeverity::Low => 1,
};
if event_severity_level < required_severity_level {
return Ok(false);
}
} else {
return Ok(false);
}
}
CaepRuleCondition::LocationChange { suspicious_only } => {
if let Some(event) = event {
if let Some(location) = &event.location {
if *suspicious_only && !location.is_suspicious {
return Ok(false);
}
} else {
return Ok(false);
}
} else {
return Ok(false);
}
}
CaepRuleCondition::UnknownDevice { require_trusted } => {
if let Some(event) = event
&& let Some(device) = &event.device_info
&& *require_trusted
&& device.is_trusted
{
return Ok(false);
}
}
CaepRuleCondition::OutsideBusinessHours { timezone: _ } => {
let hour = Utc::now().hour();
if (9..17).contains(&hour) {
return Ok(false);
}
}
CaepRuleCondition::Custom { expression: _ } => {
}
}
}
Ok(true)
}
fn determine_access_decision(
&self,
risk_score: f32,
actions: &[CaepRuleAction],
) -> CaepAccessDecision {
for action in actions {
match action {
CaepRuleAction::RevokeAccess { immediate: true } => {
return CaepAccessDecision::Deny;
}
CaepRuleAction::RevokeAccess { immediate: false } => {
return CaepAccessDecision::TemporaryDeny;
}
CaepRuleAction::RequireStepUp { .. } => {
return CaepAccessDecision::AllowWithStepUp;
}
CaepRuleAction::QuarantineSession { .. } => {
return CaepAccessDecision::TemporaryDeny;
}
_ => {}
}
}
if risk_score >= self.config.auto_revoke_threshold {
CaepAccessDecision::Deny
} else if risk_score >= 0.6 {
CaepAccessDecision::AllowWithMonitoring
} else {
CaepAccessDecision::Allow
}
}
async fn execute_actions(&self, evaluation: &CaepEvaluationResult) -> Result<()> {
for action in &evaluation.required_actions {
match action {
CaepRuleAction::RevokeAccess { .. } => {
self.revoke_subject_access(&evaluation.subject).await?;
}
CaepRuleAction::RequireStepUp { level } => {
if let Some(_step_up_manager) = &self.step_up_manager {
log::info!(
"CAEP requiring step-up to level {} for subject {}",
level,
evaluation.subject
);
}
}
CaepRuleAction::SendNotification { channels } => {
log::info!(
"CAEP sending notification via channels {:?} for subject {}",
channels,
evaluation.subject
);
}
CaepRuleAction::LogEvent { level } => {
log::info!(
"CAEP logging event at level {} for subject {}",
level,
evaluation.subject
);
}
CaepRuleAction::TriggerWebhook { url } => {
log::info!(
"CAEP triggering webhook {} for subject {}",
url,
evaluation.subject
);
}
CaepRuleAction::QuarantineSession { duration_minutes } => {
self.quarantine_session(&evaluation.subject, *duration_minutes)
.await?;
}
}
}
Ok(())
}
async fn quarantine_session(&self, subject: &str, duration_minutes: u32) -> Result<()> {
let mut sessions = self.sessions.write().await;
let quarantine_until =
Utc::now() + Duration::try_minutes(duration_minutes as i64).unwrap_or(Duration::zero());
let mut quarantined_session_ids = Vec::new();
for session in sessions.values_mut() {
if session.subject == subject {
session.is_quarantined = true;
session.quarantine_until = Some(quarantine_until);
quarantined_session_ids.push(session.session_id.clone());
}
}
log::info!(
"CAEP quarantined {} sessions for subject {} until {}. Session IDs: {:?}",
quarantined_session_ids.len(),
subject,
quarantine_until,
quarantined_session_ids
);
Ok(())
}
async fn notify_handlers(&self, event: &CaepEvent) -> Result<()> {
let handlers = self.event_handlers.read().await;
if let Some(event_handlers) = handlers.get(&event.event_type) {
for handler in event_handlers {
if let Err(e) = handler.handle_event(event).await {
log::error!("CAEP event handler failed: {}", e);
}
}
}
Ok(())
}
pub async fn get_session_state(&self, session_id: &str) -> Result<Option<CaepSessionState>> {
if let Some(oidc_session) = self.session_manager.get_session(session_id) {
if !self.session_manager.is_session_valid(session_id) {
let mut sessions = self.sessions.write().await;
sessions.remove(session_id);
return Ok(None);
}
let sessions = self.sessions.read().await;
if let Some(caep_session) = sessions.get(session_id) {
if caep_session.subject == oidc_session.sub {
return Ok(Some(caep_session.clone()));
} else {
log::warn!(
"Subject mismatch between CAEP and OIDC sessions for {}",
session_id
);
return Ok(None);
}
}
}
Ok(None)
}
pub async fn get_event_history(
&self,
subject: &str,
limit: Option<usize>,
) -> Result<Vec<CaepEvent>> {
let history = self.event_history.read().await;
let mut events: Vec<_> = history
.iter()
.filter(|e| e.subject == subject)
.cloned()
.collect();
events.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
if let Some(limit) = limit {
events.truncate(limit);
}
Ok(events)
}
pub async fn add_evaluation_rule(&self, rule: CaepEvaluationRule) -> Result<()> {
let mut rules = self.rules.write().await;
rules.retain(|r| r.id != rule.id);
rules.push(rule);
rules.sort_by(|a, b| b.priority.cmp(&a.priority));
Ok(())
}
pub async fn remove_evaluation_rule(&self, rule_id: &str) -> Result<bool> {
let mut rules = self.rules.write().await;
let original_len = rules.len();
rules.retain(|r| r.id != rule_id);
Ok(rules.len() < original_len)
}
pub async fn get_comprehensive_session_info(
&self,
session_id: &str,
) -> Result<Option<ComprehensiveSessionInfo>> {
if let Some(oidc_session) = self.session_manager.get_session(session_id) {
if !self.session_manager.is_session_valid(session_id) {
return Ok(None);
}
let caep_session = {
let sessions = self.sessions.read().await;
sessions.get(session_id).cloned()
};
let comprehensive_info = ComprehensiveSessionInfo {
oidc_session: oidc_session.clone(),
is_monitored_by_caep: caep_session.is_some(),
caep_session,
};
Ok(Some(comprehensive_info))
} else {
Ok(None)
}
}
pub async fn get_subject_sessions(
&self,
subject: &str,
) -> Result<Vec<ComprehensiveSessionInfo>> {
let oidc_sessions = self.session_manager.get_sessions_for_subject(subject);
let mut comprehensive_sessions = Vec::new();
for oidc_session in oidc_sessions {
if self
.session_manager
.is_session_valid(&oidc_session.session_id)
{
let caep_session = {
let sessions = self.sessions.read().await;
sessions.get(&oidc_session.session_id).cloned()
};
comprehensive_sessions.push(ComprehensiveSessionInfo {
oidc_session: oidc_session.clone(),
is_monitored_by_caep: caep_session.is_some(),
caep_session,
});
}
}
Ok(comprehensive_sessions)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio;
#[tokio::test]
async fn test_caep_event_creation() {
let event = CaepEvent {
id: Uuid::new_v4(),
event_type: CaepEventType::RiskScoreChange,
subject: "user123".to_string(),
severity: CaepEventSeverity::High,
timestamp: Utc::now(),
source: CaepEventSource {
system_id: "risk_engine".to_string(),
source_type: "ml_model".to_string(),
version: Some("1.0.0".to_string()),
metadata: HashMap::new(),
},
risk_score: 0.85,
session_id: Some("session123".to_string()),
location: None,
device_info: None,
event_data: serde_json::json!({
"previous_score": 0.3,
"new_score": 0.85,
"trigger": "suspicious_login_pattern"
}),
correlation_id: None,
};
assert_eq!(event.subject, "user123");
assert_eq!(event.risk_score, 0.85);
assert!(matches!(event.severity, CaepEventSeverity::High));
}
#[tokio::test]
async fn test_caep_config_creation() {
let config = CaepConfig::default();
assert!(!config.event_stream_url.is_empty());
assert!(config.auto_revoke);
assert_eq!(config.auto_revoke_threshold, 0.8);
}
#[tokio::test]
async fn test_severity_comparison() {
let high_level = match CaepEventSeverity::High {
CaepEventSeverity::Critical => 4,
CaepEventSeverity::High => 3,
CaepEventSeverity::Medium => 2,
CaepEventSeverity::Low => 1,
};
let medium_level = match CaepEventSeverity::Medium {
CaepEventSeverity::Critical => 4,
CaepEventSeverity::High => 3,
CaepEventSeverity::Medium => 2,
CaepEventSeverity::Low => 1,
};
assert!(high_level > medium_level);
}
}