use crate::security::{
emit_security_event_async,
events::{EventActor, EventOutcome, EventTarget, SecurityEvent, SecurityEventType},
justification_storage::{AccessJustification, JustificationStorage},
mfa_tracking::MfaStorage,
};
use crate::Error;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use serde_json;
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PrivilegedRole {
Admin,
Owner,
ServiceAccount,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RequestStatus {
PendingManager,
PendingSecurity,
Approved,
Denied,
Cancelled,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "snake_case")]
pub enum PrivilegedActionType {
UserCreate,
UserDelete,
UserModify,
RoleAssign,
RoleRevoke,
RoleEscalate,
PermissionGrant,
PermissionRevoke,
ConfigModify,
SecurityPolicyChange,
SecuritySettingChange,
AuditLogAccess,
DataExport,
DataDelete,
Other,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrivilegedAccessRequest {
pub request_id: Uuid,
pub user_id: Uuid,
pub requested_role: PrivilegedRole,
pub justification: String,
pub business_need: Option<String>,
pub manager_approval: Option<Uuid>,
pub security_approval: Option<Uuid>,
pub status: RequestStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
}
impl PrivilegedAccessRequest {
pub fn new(
user_id: Uuid,
requested_role: PrivilegedRole,
justification: String,
business_need: Option<String>,
manager_approval: Option<Uuid>,
) -> Self {
let now = Utc::now();
Self {
request_id: Uuid::new_v4(),
user_id,
requested_role,
justification,
business_need,
manager_approval,
security_approval: None,
status: RequestStatus::PendingManager,
created_at: now,
updated_at: now,
expires_at: None,
}
}
pub fn is_approved(&self) -> bool {
self.status == RequestStatus::Approved
}
pub fn is_expired(&self) -> bool {
self.expires_at.map(|exp| Utc::now() > exp).unwrap_or(false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrivilegedAction {
pub action_id: Uuid,
pub user_id: Uuid,
pub action_type: PrivilegedActionType,
pub resource: Option<String>,
pub details: Option<String>,
pub ip_address: Option<String>,
pub user_agent: Option<String>,
pub session_id: Option<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrivilegedSession {
pub session_id: String,
pub user_id: Uuid,
pub role: PrivilegedRole,
pub started_at: DateTime<Utc>,
pub last_activity: DateTime<Utc>,
pub ip_address: Option<String>,
pub user_agent: Option<String>,
pub is_active: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
pub struct PrivilegedAccessConfig {
pub require_mfa: bool,
pub mfa_grace_period_days: u64,
pub auto_suspend_no_mfa: bool,
pub session_timeout_minutes: u64,
pub max_concurrent_sessions: u32,
pub record_sensitive_actions: bool,
pub monitor_activity: bool,
pub sensitive_actions: Vec<PrivilegedActionType>,
}
impl Default for PrivilegedAccessConfig {
fn default() -> Self {
Self {
require_mfa: true,
mfa_grace_period_days: 7,
auto_suspend_no_mfa: true,
session_timeout_minutes: 30,
max_concurrent_sessions: 2,
record_sensitive_actions: true,
monitor_activity: true,
sensitive_actions: vec![
PrivilegedActionType::UserDelete,
PrivilegedActionType::RoleEscalate,
PrivilegedActionType::SecurityPolicyChange,
PrivilegedActionType::DataExport,
PrivilegedActionType::AuditLogAccess,
],
}
}
}
pub struct PrivilegedAccessManager {
config: PrivilegedAccessConfig,
mfa_storage: Option<Arc<dyn MfaStorage>>,
justification_storage: Option<Arc<dyn JustificationStorage>>,
sessions: Arc<RwLock<HashMap<String, PrivilegedSession>>>,
actions: Arc<RwLock<Vec<PrivilegedAction>>>,
requests: Arc<RwLock<HashMap<Uuid, PrivilegedAccessRequest>>>,
}
impl PrivilegedAccessManager {
pub fn new(
config: PrivilegedAccessConfig,
mfa_storage: Option<Arc<dyn MfaStorage>>,
justification_storage: Option<Arc<dyn JustificationStorage>>,
) -> Self {
Self {
config,
mfa_storage,
justification_storage,
sessions: Arc::new(RwLock::new(HashMap::new())),
actions: Arc::new(RwLock::new(Vec::new())),
requests: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn request_privileged_access(
&self,
user_id: Uuid,
requested_role: PrivilegedRole,
justification: String,
business_need: Option<String>,
manager_approval: Option<Uuid>,
) -> Result<PrivilegedAccessRequest, Error> {
let request = PrivilegedAccessRequest::new(
user_id,
requested_role,
justification,
business_need,
manager_approval,
);
let mut requests = self.requests.write().await;
requests.insert(request.request_id, request.clone());
Ok(request)
}
pub async fn approve_manager(&self, request_id: Uuid, approver_id: Uuid) -> Result<(), Error> {
let mut requests = self.requests.write().await;
let request = requests
.get_mut(&request_id)
.ok_or_else(|| Error::not_found("PrivilegedAccessRequest", request_id.to_string()))?;
if request.status != RequestStatus::PendingManager {
return Err(Error::invalid_state("Request is not pending manager approval"));
}
let user_id = request.user_id;
request.manager_approval = Some(approver_id);
request.status = RequestStatus::PendingSecurity;
request.updated_at = Utc::now();
let event = SecurityEvent::new(SecurityEventType::AuthzPrivilegeEscalation, None, None)
.with_actor(EventActor {
user_id: Some(approver_id.to_string()),
username: None,
ip_address: None,
user_agent: None,
})
.with_target(EventTarget {
resource_type: Some("privileged_access_request".to_string()),
resource_id: Some(request_id.to_string()),
method: None,
})
.with_outcome(EventOutcome {
success: true,
reason: Some("Manager approval granted".to_string()),
})
.with_metadata("request_user_id".to_string(), serde_json::json!(user_id.to_string()))
.with_metadata(
"requested_role".to_string(),
serde_json::json!(format!("{:?}", request.requested_role)),
);
emit_security_event_async(event);
Ok(())
}
pub async fn approve_security(
&self,
request_id: Uuid,
approver_id: Uuid,
expiration_days: u64,
) -> Result<(), Error> {
let mut requests = self.requests.write().await;
let request = requests
.get_mut(&request_id)
.ok_or_else(|| Error::not_found("PrivilegedAccessRequest", request_id.to_string()))?;
if request.status != RequestStatus::PendingSecurity {
return Err(Error::invalid_state("Request is not pending security approval"));
}
request.security_approval = Some(approver_id);
request.status = RequestStatus::Approved;
request.expires_at = Some(Utc::now() + Duration::days(expiration_days as i64));
request.updated_at = Utc::now();
if let Some(ref just_storage) = self.justification_storage {
let justification = AccessJustification::new(
request.user_id,
request.justification.clone(),
request.business_need.clone(),
request.manager_approval,
request.expires_at,
);
just_storage.set_justification(justification).await?;
}
let event = SecurityEvent::new(SecurityEventType::AuthzPrivilegeEscalation, None, None)
.with_actor(EventActor {
user_id: Some(approver_id.to_string()),
username: None,
ip_address: None,
user_agent: None,
})
.with_target(EventTarget {
resource_type: Some("privileged_access_request".to_string()),
resource_id: Some(request_id.to_string()),
method: None,
})
.with_outcome(EventOutcome {
success: true,
reason: Some("Security approval granted".to_string()),
})
.with_metadata(
"request_user_id".to_string(),
serde_json::json!(request.user_id.to_string()),
)
.with_metadata(
"requested_role".to_string(),
serde_json::json!(format!("{:?}", request.requested_role)),
)
.with_metadata("expiration_days".to_string(), serde_json::json!(expiration_days));
emit_security_event_async(event);
Ok(())
}
pub async fn deny_request(&self, request_id: Uuid, reason: String) -> Result<(), Error> {
let mut requests = self.requests.write().await;
let request = requests
.get_mut(&request_id)
.ok_or_else(|| Error::not_found("PrivilegedAccessRequest", request_id.to_string()))?;
let user_id = request.user_id;
request.status = RequestStatus::Denied;
request.updated_at = Utc::now();
let event = SecurityEvent::new(SecurityEventType::AuthzAccessDenied, None, None)
.with_actor(EventActor {
user_id: Some(user_id.to_string()),
username: None,
ip_address: None,
user_agent: None,
})
.with_target(EventTarget {
resource_type: Some("privileged_access_request".to_string()),
resource_id: Some(request_id.to_string()),
method: None,
})
.with_outcome(EventOutcome {
success: false,
reason: Some(reason.clone()),
})
.with_metadata(
"requested_role".to_string(),
serde_json::json!(format!("{:?}", request.requested_role)),
);
emit_security_event_async(event);
Ok(())
}
pub async fn check_mfa_compliance(&self, user_id: Uuid) -> Result<bool, Error> {
if !self.config.require_mfa {
return Ok(true);
}
if let Some(ref mfa_storage) = self.mfa_storage {
let mfa_status = mfa_storage.get_mfa_status(user_id).await?;
Ok(mfa_status.map(|s| s.enabled).unwrap_or(false))
} else {
Ok(true)
}
}
#[allow(clippy::too_many_arguments)]
pub async fn record_action(
&self,
user_id: Uuid,
action_type: PrivilegedActionType,
resource: Option<String>,
details: Option<String>,
ip_address: Option<String>,
user_agent: Option<String>,
session_id: Option<String>,
) -> Result<Uuid, Error> {
let action = PrivilegedAction {
action_id: Uuid::new_v4(),
user_id,
action_type,
resource,
details,
ip_address,
user_agent,
session_id,
timestamp: Utc::now(),
};
let mut actions = self.actions.write().await;
actions.push(action.clone());
let event_type = if self.config.sensitive_actions.contains(&action_type) {
SecurityEventType::AuthzPrivilegeEscalation
} else {
SecurityEventType::AuthzAccessGranted
};
let event = SecurityEvent::new(event_type, None, None)
.with_actor(EventActor {
user_id: Some(action.user_id.to_string()),
username: None,
ip_address: action.ip_address.clone(),
user_agent: action.user_agent.clone(),
})
.with_target(EventTarget {
resource_type: Some(format!("privileged_action_{:?}", action_type)),
resource_id: Some(action.action_id.to_string()),
method: action.resource.clone(),
})
.with_outcome(EventOutcome {
success: true,
reason: action.details.clone(),
})
.with_metadata(
"action_type".to_string(),
serde_json::json!(format!("{:?}", action_type)),
)
.with_metadata(
"session_id".to_string(),
serde_json::json!(action.session_id.clone().unwrap_or_default()),
);
emit_security_event_async(event);
Ok(action.action_id)
}
pub async fn start_session(
&self,
session_id: String,
user_id: Uuid,
role: PrivilegedRole,
ip_address: Option<String>,
user_agent: Option<String>,
) -> Result<(), Error> {
if !self.check_mfa_compliance(user_id).await? && self.config.auto_suspend_no_mfa {
return Err(Error::invalid_state("MFA not enabled for privileged user"));
}
let sessions = self.sessions.read().await;
let active_sessions =
sessions.values().filter(|s| s.user_id == user_id && s.is_active).count();
if active_sessions >= self.config.max_concurrent_sessions as usize {
return Err(Error::invalid_state("Maximum concurrent sessions reached"));
}
drop(sessions);
let ip_address_clone = ip_address.clone();
let user_agent_clone = user_agent.clone();
let session = PrivilegedSession {
session_id: session_id.clone(),
user_id,
role,
started_at: Utc::now(),
last_activity: Utc::now(),
ip_address,
user_agent,
is_active: true,
};
let mut sessions = self.sessions.write().await;
sessions.insert(session_id.clone(), session);
let event = SecurityEvent::new(SecurityEventType::AuthzPrivilegeEscalation, None, None)
.with_actor(EventActor {
user_id: Some(user_id.to_string()),
username: None,
ip_address: ip_address_clone,
user_agent: user_agent_clone,
})
.with_target(EventTarget {
resource_type: Some("privileged_session".to_string()),
resource_id: Some(session_id.clone()),
method: Some(format!("{:?}", role)),
})
.with_outcome(EventOutcome {
success: true,
reason: Some("Privileged session started".to_string()),
})
.with_metadata("role".to_string(), serde_json::json!(format!("{:?}", role)));
emit_security_event_async(event);
Ok(())
}
pub async fn update_session_activity(&self, session_id: &str) -> Result<(), Error> {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(session_id) {
session.last_activity = Utc::now();
}
Ok(())
}
pub async fn end_session(&self, session_id: &str) -> Result<(), Error> {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(session_id) {
let user_id = session.user_id;
let role = session.role;
session.is_active = false;
let event = SecurityEvent::new(SecurityEventType::AuthzAccessGranted, None, None)
.with_actor(EventActor {
user_id: Some(user_id.to_string()),
username: None,
ip_address: session.ip_address.clone(),
user_agent: session.user_agent.clone(),
})
.with_target(EventTarget {
resource_type: Some("privileged_session".to_string()),
resource_id: Some(session_id.to_string()),
method: Some(format!("{:?}", role)),
})
.with_outcome(EventOutcome {
success: true,
reason: Some("Privileged session ended".to_string()),
})
.with_metadata("role".to_string(), serde_json::json!(format!("{:?}", role)))
.with_metadata(
"duration_seconds".to_string(),
serde_json::json!((Utc::now() - session.started_at).num_seconds()),
);
emit_security_event_async(event);
}
Ok(())
}
pub async fn cleanup_expired_sessions(&self) -> Result<Vec<String>, Error> {
let timeout = Duration::minutes(self.config.session_timeout_minutes as i64);
let now = Utc::now();
let mut expired = Vec::new();
let mut sessions = self.sessions.write().await;
for (session_id, session) in sessions.iter_mut() {
if session.is_active && (now - session.last_activity) > timeout {
session.is_active = false;
expired.push(session_id.clone());
}
}
Ok(expired)
}
pub async fn get_user_actions(&self, user_id: Uuid) -> Result<Vec<PrivilegedAction>, Error> {
let actions = self.actions.read().await;
Ok(actions.iter().filter(|a| a.user_id == user_id).cloned().collect())
}
pub async fn get_active_sessions(&self) -> Result<Vec<PrivilegedSession>, Error> {
let sessions = self.sessions.read().await;
Ok(sessions.values().filter(|s| s.is_active).cloned().collect())
}
pub async fn get_request(
&self,
request_id: Uuid,
) -> Result<Option<PrivilegedAccessRequest>, Error> {
let requests = self.requests.read().await;
Ok(requests.get(&request_id).cloned())
}
pub async fn get_user_requests(
&self,
user_id: Uuid,
) -> Result<Vec<PrivilegedAccessRequest>, Error> {
let requests = self.requests.read().await;
Ok(requests.values().filter(|r| r.user_id == user_id).cloned().collect())
}
}
use std::sync::Arc;
use tokio::sync::RwLock;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_privileged_access_request() {
let manager = PrivilegedAccessManager::new(PrivilegedAccessConfig::default(), None, None);
let request = manager
.request_privileged_access(
Uuid::new_v4(),
PrivilegedRole::Admin,
"Required for system administration".to_string(),
Some("Manage production infrastructure".to_string()),
Some(Uuid::new_v4()),
)
.await
.unwrap();
assert_eq!(request.status, RequestStatus::PendingManager);
assert!(request.manager_approval.is_some());
}
}