use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sea_orm::ConnectionTrait;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum AuditOperation {
Create,
Read,
Update,
Delete,
Login,
Logout,
PermissionChange,
ConfigChange,
Other(String),
}
impl fmt::Display for AuditOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AuditOperation::Create => write!(f, "CREATE"),
AuditOperation::Read => write!(f, "READ"),
AuditOperation::Update => write!(f, "UPDATE"),
AuditOperation::Delete => write!(f, "DELETE"),
AuditOperation::Login => write!(f, "LOGIN"),
AuditOperation::Logout => write!(f, "LOGOUT"),
AuditOperation::PermissionChange => write!(f, "PERMISSION_CHANGE"),
AuditOperation::ConfigChange => write!(f, "CONFIG_CHANGE"),
AuditOperation::Other(s) => write!(f, "{}", s.to_uppercase()),
}
}
}
impl Default for AuditOperation {
fn default() -> Self {
AuditOperation::Other("UNKNOWN".to_string())
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub enum AuditSeverity {
#[default]
Info,
Low,
Medium,
High,
Critical,
}
impl fmt::Display for AuditSeverity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AuditSeverity::Info => write!(f, "INFO"),
AuditSeverity::Low => write!(f, "LOW"),
AuditSeverity::Medium => write!(f, "MEDIUM"),
AuditSeverity::High => write!(f, "HIGH"),
AuditSeverity::Critical => write!(f, "CRITICAL"),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub enum AuditResult {
#[default]
Success,
Failure,
Partial,
Unknown,
}
impl fmt::Display for AuditResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AuditResult::Success => write!(f, "SUCCESS"),
AuditResult::Failure => write!(f, "FAILURE"),
AuditResult::Partial => write!(f, "PARTIAL"),
AuditResult::Unknown => write!(f, "UNKNOWN"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditEvent {
pub id: String,
pub timestamp: DateTime<Utc>,
pub operation: AuditOperation,
pub entity_type: String,
pub entity_id: String,
pub user_id: String,
pub user_role: String,
pub client_ip: String,
pub severity: AuditSeverity,
pub result: AuditResult,
pub before_value: Option<String>,
pub after_value: Option<String>,
pub extra: Option<String>,
pub request_id: String,
pub session_id: String,
}
impl AuditEvent {
pub fn new(
operation: AuditOperation,
entity_type: &str,
entity_id: &str,
user_id: &str,
user_role: &str,
client_ip: &str,
) -> Self {
Self {
id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
operation,
entity_type: entity_type.to_string(),
entity_id: entity_id.to_string(),
user_id: user_id.to_string(),
user_role: user_role.to_string(),
client_ip: client_ip.to_string(),
severity: AuditSeverity::Info,
result: AuditResult::Success,
before_value: None,
after_value: None,
extra: None,
request_id: Uuid::new_v4().to_string(),
session_id: String::new(),
}
}
pub fn builder() -> AuditEventBuilder {
AuditEventBuilder::new()
}
pub fn create(entity_type: &str, entity_id: &str, user_id: &str) -> Self {
Self::new(AuditOperation::Create, entity_type, entity_id, user_id, "", "")
}
pub fn read(entity_type: &str, entity_id: &str, user_id: &str) -> Self {
Self::new(AuditOperation::Read, entity_type, entity_id, user_id, "", "")
}
pub fn update(
entity_type: &str,
entity_id: &str,
user_id: &str,
before: Option<String>,
after: Option<String>,
) -> Self {
let mut event = Self::new(AuditOperation::Update, entity_type, entity_id, user_id, "", "");
event.before_value = before;
event.after_value = after;
event
}
pub fn delete(entity_type: &str, entity_id: &str, user_id: &str) -> Self {
Self::new(AuditOperation::Delete, entity_type, entity_id, user_id, "", "")
}
pub fn with_user(mut self, role: &str, client_ip: &str) -> Self {
self.user_role = role.to_string();
self.client_ip = client_ip.to_string();
self
}
pub fn with_result(mut self, result: AuditResult) -> Self {
self.result = result;
self
}
pub fn with_error(mut self, error: &str) -> Self {
self.result = AuditResult::Failure;
self.extra = Some(error.to_string());
self
}
pub fn with_severity(mut self, severity: AuditSeverity) -> Self {
self.severity = severity;
self
}
pub fn with_extra(mut self, extra: &str) -> Self {
self.extra = Some(extra.to_string());
self
}
pub fn with_before_value(mut self, value: &str) -> Self {
self.before_value = Some(value.to_string());
self
}
pub fn with_after_value(mut self, value: &str) -> Self {
self.after_value = Some(value.to_string());
self
}
pub fn with_request_id(mut self, request_id: &str) -> Self {
self.request_id = request_id.to_string();
self
}
pub fn with_session_id(mut self, session_id: &str) -> Self {
self.session_id = session_id.to_string();
self
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(json)
}
pub fn sanitize_value(value: &str, sensitive_fields: Option<Vec<String>>) -> String {
let default_sensitive = vec![
"password".to_string(),
"token".to_string(),
"secret".to_string(),
"key".to_string(),
"credential".to_string(),
"api_key".to_string(),
"access_token".to_string(),
"refresh_token".to_string(),
"private_key".to_string(),
"credit_card".to_string(),
"ssn".to_string(),
"social_security".to_string(),
];
let fields = sensitive_fields.unwrap_or(default_sensitive);
if let Ok(serde_json::Value::Object(mut obj)) = serde_json::from_str::<serde_json::Value>(value) {
for field in &fields {
if let Some(_value) = obj.remove(field) {
tracing::debug!("Sensitive field '{}' redacted in audit log", field);
}
}
for field in &fields {
obj.insert(field.clone(), serde_json::Value::String("***REDACTED***".to_string()));
}
return serde_json::to_string(&obj).unwrap_or_else(|_| "***SANITIZATION_ERROR***".to_string());
}
let lower = value.to_lowercase();
for field in &fields {
if lower.contains(&format!("\"{}\":", field)) || lower.contains(&format!("\"{}\" :", field)) {
return "***REDACTED***".to_string();
}
}
value.to_string()
}
pub fn sanitized(&self) -> Self {
let sensitive_fields = vec![
"password".to_string(),
"token".to_string(),
"secret".to_string(),
"key".to_string(),
"credential".to_string(),
];
let mut sanitized = self.clone();
if let Some(ref mut before) = sanitized.before_value {
*before = Self::sanitize_value(before, Some(sensitive_fields.clone()));
}
if let Some(ref mut after) = sanitized.after_value {
*after = Self::sanitize_value(after, Some(sensitive_fields.clone()));
}
if let Some(ref mut extra) = sanitized.extra {
*extra = Self::sanitize_value(extra, Some(sensitive_fields.clone()));
}
sanitized
}
}
#[derive(Debug, Clone)]
pub struct AuditConfig {
pub enabled: bool,
pub storage_path: Option<String>,
pub sync_write: bool,
pub max_file_size: u64,
pub retention_count: u32,
pub sensitive_fields: Vec<String>,
pub alert_operations: Vec<AuditOperation>,
pub alert_severity: AuditSeverity,
pub max_retries: Option<u32>,
}
impl Default for AuditConfig {
fn default() -> Self {
Self {
enabled: true,
storage_path: None,
sync_write: false,
max_file_size: 10 * 1024 * 1024, retention_count: 7,
sensitive_fields: vec![
"password".to_string(),
"token".to_string(),
"secret".to_string(),
"api_key".to_string(),
],
alert_operations: vec![
AuditOperation::Delete,
AuditOperation::PermissionChange,
AuditOperation::ConfigChange,
],
alert_severity: AuditSeverity::High,
max_retries: Some(3),
}
}
}
#[async_trait]
pub trait AuditStorage: Send + Sync {
async fn store(&self, event: &AuditEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
async fn store_batch(&self, events: &[AuditEvent]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
for event in events {
self.store(event).await?;
}
Ok(())
}
async fn query(
&self,
filters: &AuditQueryFilters,
) -> Result<Vec<AuditEvent>, Box<dyn std::error::Error + Send + Sync>>;
async fn cleanup(&self, before: &DateTime<Utc>) -> Result<u64, Box<dyn std::error::Error + Send + Sync>>;
}
#[derive(Debug, Default)]
pub struct AuditQueryFilters {
pub user_id: Option<String>,
pub entity_type: Option<String>,
pub operation: Option<AuditOperation>,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub severity: Option<AuditSeverity>,
pub result: Option<AuditResult>,
}
#[derive(Debug)]
pub struct MemoryAuditStorage {
events: Mutex<Vec<AuditEvent>>,
max_events: usize,
dropped_count: AtomicU64,
}
impl Default for MemoryAuditStorage {
fn default() -> Self {
Self::new(10000) }
}
impl MemoryAuditStorage {
pub fn new(max_events: usize) -> Self {
Self {
events: Mutex::new(Vec::with_capacity(max_events)),
max_events: if max_events == 0 { 10000 } else { max_events },
dropped_count: AtomicU64::new(0),
}
}
pub fn dropped_count(&self) -> u64 {
self.dropped_count.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn event_count(&self) -> usize {
let events = self.events.lock().await;
events.len()
}
}
#[async_trait]
impl AuditStorage for MemoryAuditStorage {
async fn store(&self, event: &AuditEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut events = self.events.lock().await;
if events.len() >= self.max_events {
events.remove(0);
self.dropped_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
events.push(event.clone());
Ok(())
}
async fn query(
&self,
filters: &AuditQueryFilters,
) -> Result<Vec<AuditEvent>, Box<dyn std::error::Error + Send + Sync>> {
let events = self.events.lock().await;
let mut result = events.clone();
if let Some(user_id) = &filters.user_id {
result.retain(|e| e.user_id == *user_id);
}
if let Some(entity_type) = &filters.entity_type {
result.retain(|e| e.entity_type == *entity_type);
}
if let Some(operation) = &filters.operation {
result.retain(|e| e.operation == *operation);
}
if let Some(start_time) = &filters.start_time {
result.retain(|e| e.timestamp >= *start_time);
}
if let Some(end_time) = &filters.end_time {
result.retain(|e| e.timestamp <= *end_time);
}
if let Some(severity) = &filters.severity {
result.retain(|e| e.severity == *severity);
}
if let Some(result_status) = &filters.result {
result.retain(|e| e.result == *result_status);
}
Ok(result)
}
async fn cleanup(&self, before: &DateTime<Utc>) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let mut events = self.events.lock().await;
let before_count = events.len();
events.retain(|e| e.timestamp > *before);
let after_count = events.len();
Ok((before_count - after_count) as u64)
}
}
#[derive(Debug)]
pub struct FileAuditStorage {
log_path: std::path::PathBuf,
file: Mutex<tokio::fs::File>,
json_format: bool,
}
impl FileAuditStorage {
pub async fn new(log_path: impl AsRef<std::path::Path>, json_format: bool) -> Result<Self, std::io::Error> {
let log_path = log_path.as_ref().to_path_buf();
if let Some(parent) = log_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.await?;
Ok(Self {
log_path,
file: Mutex::new(file),
json_format,
})
}
pub fn log_path(&self) -> &std::path::Path {
&self.log_path
}
}
#[async_trait]
impl AuditStorage for FileAuditStorage {
async fn store(&self, event: &AuditEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let log_line = if self.json_format {
serde_json::to_string(event)?
} else {
format!(
"[{}] {} - {} {} - {} - {}",
event.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
event.user_id,
event.operation,
event.entity_type,
event.entity_id,
event.result
)
};
let mut file = self.file.lock().await;
tokio::io::AsyncWriteExt::write_all(&mut *file, (log_line + "\n").as_bytes()).await?;
tokio::io::AsyncWriteExt::flush(&mut *file).await?;
Ok(())
}
async fn query(
&self,
_filters: &AuditQueryFilters,
) -> Result<Vec<AuditEvent>, Box<dyn std::error::Error + Send + Sync>> {
Ok(Vec::new())
}
async fn cleanup(&self, _before: &DateTime<Utc>) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
Ok(0)
}
}
#[derive(Debug, Clone)]
pub struct DatabaseAuditStorage {
pool: sea_orm::DatabaseConnection,
table_name: String,
}
impl DatabaseAuditStorage {
pub async fn new(pool: sea_orm::DatabaseConnection, table_name: Option<String>) -> Result<Self, sea_orm::DbErr> {
let table_name = table_name.unwrap_or_else(|| "audit_logs".to_string());
let create_table_sql = format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
user_id VARCHAR(255) NOT NULL,
operation VARCHAR(50) NOT NULL,
entity_type VARCHAR(255),
entity_id VARCHAR(255),
severity VARCHAR(20) NOT NULL,
result VARCHAR(20) NOT NULL,
error_message TEXT,
ip_address VARCHAR(45),
user_agent TEXT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_{}_timestamp ON {} (timestamp);
CREATE INDEX IF NOT EXISTS idx_{}_user_id ON {} (user_id);
CREATE INDEX IF NOT EXISTS idx_{}_entity_type ON {} (entity_type);
"#,
table_name, table_name, table_name, table_name, table_name, table_name, table_name
);
pool.execute_raw(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
&create_table_sql,
vec![],
))
.await?;
Ok(Self { pool, table_name })
}
pub fn table_name(&self) -> &str {
&self.table_name
}
}
#[async_trait]
impl AuditStorage for DatabaseAuditStorage {
async fn store(&self, event: &AuditEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let insert_sql = format!(
r#"
INSERT INTO {} (id, timestamp, user_id, operation, entity_type, entity_id, user_role, client_ip, severity, result, before_value, after_value, extra, request_id, session_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
"#,
self.table_name
);
self.pool
.execute_raw(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
&insert_sql,
vec![
event.id.clone().into(),
event.timestamp.to_rfc3339().into(),
event.user_id.clone().into(),
event.operation.to_string().into(),
event.entity_type.clone().into(),
event.entity_id.clone().into(),
event.user_role.clone().into(),
event.client_ip.clone().into(),
event.severity.to_string().into(),
event.result.to_string().into(),
event.before_value.clone().unwrap_or_default().into(),
event.after_value.clone().unwrap_or_default().into(),
event.extra.clone().unwrap_or_default().into(),
event.request_id.clone().into(),
event.session_id.clone().into(),
],
))
.await?;
Ok(())
}
async fn query(
&self,
filters: &AuditQueryFilters,
) -> Result<Vec<AuditEvent>, Box<dyn std::error::Error + Send + Sync>> {
let mut query = format!("SELECT * FROM {} WHERE 1=1", self.table_name);
let mut conditions = Vec::new();
let mut params = Vec::new();
let mut param_index = 1;
if let Some(user_id) = &filters.user_id {
conditions.push(format!("user_id = ${}", param_index));
params.push(user_id.clone());
param_index += 1;
}
if let Some(entity_type) = &filters.entity_type {
conditions.push(format!("entity_type = ${}", param_index));
params.push(entity_type.clone());
param_index += 1;
}
if let Some(operation) = &filters.operation {
conditions.push(format!("operation = ${}", param_index));
params.push(operation.to_string());
param_index += 1;
}
if let Some(start_time) = &filters.start_time {
conditions.push(format!("timestamp >= ${}", param_index));
params.push(start_time.to_rfc3339());
param_index += 1;
}
if let Some(end_time) = &filters.end_time {
conditions.push(format!("timestamp <= ${}", param_index));
params.push(end_time.to_rfc3339());
param_index += 1;
}
if let Some(severity) = &filters.severity {
conditions.push(format!("severity = ${}", param_index));
params.push(severity.to_string());
param_index += 1;
}
if let Some(result) = &filters.result {
conditions.push(format!("result = ${}", param_index));
params.push(result.to_string());
param_index += 1;
}
if !conditions.is_empty() {
query.push_str(" AND ");
query.push_str(&conditions.join(" AND "));
}
query.push_str(" ORDER BY timestamp DESC LIMIT 1000");
let result = self
.pool
.query_all_raw(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
&query,
params.into_iter().map(|s| s.into()).collect::<Vec<_>>(),
))
.await?;
let mut events = Vec::new();
for row in result {
let operation_str: String = row.try_get("", "operation")?;
let operation = match operation_str.as_str() {
"CREATE" => AuditOperation::Create,
"READ" => AuditOperation::Read,
"UPDATE" => AuditOperation::Update,
"DELETE" => AuditOperation::Delete,
"LOGIN" => AuditOperation::Login,
"LOGOUT" => AuditOperation::Logout,
"PERMISSION_CHANGE" => AuditOperation::PermissionChange,
"CONFIG_CHANGE" => AuditOperation::ConfigChange,
_ => AuditOperation::Other(operation_str),
};
let severity_str: String = row.try_get("", "severity")?;
let severity = match severity_str.as_str() {
"INFO" => AuditSeverity::Info,
"LOW" => AuditSeverity::Low,
"MEDIUM" => AuditSeverity::Medium,
"HIGH" => AuditSeverity::High,
"CRITICAL" => AuditSeverity::Critical,
_ => AuditSeverity::Info,
};
let result_str: String = row.try_get("", "result")?;
let result = match result_str.as_str() {
"SUCCESS" => AuditResult::Success,
"FAILURE" => AuditResult::Failure,
_ => AuditResult::Success,
};
let timestamp_str: String = row.try_get("", "timestamp")?;
let timestamp = DateTime::parse_from_rfc3339(×tamp_str)?.with_timezone(&Utc);
events.push(AuditEvent {
id: row.try_get("", "id")?,
timestamp,
operation,
entity_type: row.try_get("", "entity_type")?,
entity_id: row.try_get("", "entity_id")?,
user_id: row.try_get("", "user_id")?,
user_role: row.try_get("", "user_role")?,
client_ip: row.try_get("", "client_ip")?,
severity,
result,
before_value: row.try_get("", "before_value")?,
after_value: row.try_get("", "after_value")?,
extra: row.try_get("", "extra")?,
request_id: row.try_get("", "request_id")?,
session_id: row.try_get("", "session_id")?,
});
}
Ok(events)
}
async fn cleanup(&self, before: &DateTime<Utc>) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let delete_sql = format!("DELETE FROM {} WHERE timestamp < $1", self.table_name);
let result = self
.pool
.execute_raw(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
&delete_sql,
vec![before.to_rfc3339().into()],
))
.await?;
Ok(result.rows_affected())
}
async fn store_batch(&self, events: &[AuditEvent]) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if events.is_empty() {
return Ok(());
}
let mut values = Vec::new();
for event in events {
values.push(format!(
"('{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}')",
event.id.replace('\'', "''"),
event.timestamp.to_rfc3339(),
event.user_id.replace('\'', "''"),
event.operation.to_string(),
event.entity_type.replace('\'', "''"),
event.entity_id.replace('\'', "''"),
event.user_role.replace('\'', "''"),
event.client_ip.replace('\'', "''"),
event.severity.to_string(),
event.result.to_string(),
event
.before_value
.as_ref()
.map(|s| format!("'{}'", s.replace('\'', "''")))
.unwrap_or("NULL".to_string()),
event
.after_value
.as_ref()
.map(|s| format!("'{}'", s.replace('\'', "''")))
.unwrap_or("NULL".to_string()),
event
.extra
.as_ref()
.map(|s| format!("'{}'", s.replace('\'', "''")))
.unwrap_or("NULL".to_string()),
event.request_id.replace('\'', "''"),
event.session_id.replace('\'', "''")
));
}
let insert_sql = format!(
"INSERT INTO {} (id, timestamp, user_id, operation, entity_type, entity_id, user_role, client_ip, severity, result, before_value, after_value, extra, request_id, session_id) VALUES {}",
self.table_name,
values.join(", ")
);
self.pool
.execute_raw(sea_orm::Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
&insert_sql,
vec![],
))
.await?;
Ok(())
}
}
type AuditAlertCallback = Arc<dyn Fn(&AuditEvent) + Send + Sync>;
pub struct AuditLogger {
config: AuditConfig,
storage: Arc<dyn AuditStorage>,
alert_callback: Option<AuditAlertCallback>,
}
impl AuditLogger {
pub fn new(config: AuditConfig, storage: Arc<dyn AuditStorage>) -> Self {
Self {
config,
storage,
alert_callback: None,
}
}
pub fn with_default_storage() -> Self {
Self::new(AuditConfig::default(), Arc::new(MemoryAuditStorage::new(10000)))
}
pub fn set_alert_callback<F>(&mut self, callback: F)
where
F: Fn(&AuditEvent) + Send + Sync + 'static,
{
self.alert_callback = Some(Arc::new(callback));
}
pub async fn log(&self, event: AuditEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !self.config.enabled {
return Ok(());
}
let event = self.sanitize_event(event);
let max_retries = self.config.max_retries.unwrap_or(3);
let mut last_error = None;
for attempt in 0..=max_retries {
match self.storage.store(&event).await {
Ok(()) => break,
Err(e) => {
last_error = Some(e);
if attempt < max_retries {
let delay = std::time::Duration::from_millis(100 * (2_u64.pow(attempt as u32)));
tokio::time::sleep(delay).await;
tracing::warn!(
"Audit log storage failed (attempt {}/{}), retrying after {:?}",
attempt + 1,
max_retries,
delay
);
}
}
}
}
if let Some(error) = last_error {
tracing::error!("Failed to store audit event after {} retries: {}", max_retries, error);
return Err(error);
}
if self.should_alert(&event) {
self.trigger_alert(&event);
}
Ok(())
}
pub async fn log_create(
&self,
entity_type: &str,
entity_id: &str,
user_id: &str,
value: Option<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let event = AuditEvent::create(entity_type, entity_id, user_id);
let event = match value {
Some(ref v) => event.with_after_value(v),
None => event,
};
self.log(event).await
}
pub async fn log_read(
&self,
entity_type: &str,
entity_id: &str,
user_id: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let event = AuditEvent::read(entity_type, entity_id, user_id);
self.log(event).await
}
pub async fn log_update(
&self,
entity_type: &str,
entity_id: &str,
user_id: &str,
before: Option<String>,
after: Option<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let event = AuditEvent::update(entity_type, entity_id, user_id, before, after);
self.log(event).await
}
pub async fn log_delete(
&self,
entity_type: &str,
entity_id: &str,
user_id: &str,
before: Option<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let event = AuditEvent::delete(entity_type, entity_id, user_id).with_severity(AuditSeverity::High);
let event = match before {
Some(ref v) => event.with_before_value(v),
None => event,
};
self.log(event).await
}
pub async fn query(
&self,
filters: &AuditQueryFilters,
) -> Result<Vec<AuditEvent>, Box<dyn std::error::Error + Send + Sync>> {
self.storage.query(filters).await
}
pub async fn cleanup(&self, days: i64) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let delta = chrono::Duration::try_days(days).ok_or("Invalid date calculation")?;
let before = Utc::now().checked_sub_signed(delta).ok_or("Invalid date calculation")?;
self.storage.cleanup(&before).await
}
fn sanitize_event(&self, mut event: AuditEvent) -> AuditEvent {
let sanitize_value = |value: Option<String>| -> Option<String> {
if let Some(v) = value {
if self.config.sensitive_fields.is_empty() {
return Some(v);
}
let mut replacements = Vec::new();
for field in &self.config.sensitive_fields {
let replacement = format!("***REDACTED_{}***", field.to_uppercase());
replacements.push((format!(r#""{}":"#, field), format!(r#""{}":"#, &replacement)));
replacements.push((format!(r#"{}:"#, field), format!(r#"{}:"#, &replacement)));
if field.contains('.') {
replacements.push((format!(r#""{}""#, field), format!(r#""{}""#, &replacement)));
}
}
let mut result = v;
for (pattern, replacement) in &replacements {
result = result.replace(pattern, replacement);
}
for field in &self.config.sensitive_fields {
let replacement = format!("***REDACTED_{}***", field.to_uppercase());
result = Self::sanitize_generic_base64(&result, field, &replacement);
result = Self::sanitize_json_arrays(&result, field, &replacement);
}
Some(result)
} else {
None
}
};
event.before_value = sanitize_value(event.before_value);
event.after_value = sanitize_value(event.after_value);
event.extra = sanitize_value(event.extra);
event
}
fn sanitize_generic_base64(value: &str, field: &str, replacement: &str) -> String {
let mut result = value.to_string();
if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(value) {
if let Some(obj) = json_val.as_object() {
let mut modified = false;
let mut new_obj = serde_json::Map::new();
let underscore_str = String::from("_");
let field_with_underscore = format!("{}{}", underscore_str, field);
for (k, v) in obj {
if k == field || k.contains(&field_with_underscore) {
let redacted_key = format!("{}{}redacted", k, underscore_str);
new_obj.insert(redacted_key, serde_json::Value::String(replacement.to_string()));
modified = true;
} else if v.is_string() {
let s = v.as_str().unwrap_or("");
if Self::is_base64(s) {
new_obj.insert(k.clone(), serde_json::Value::String(replacement.to_string()));
modified = true;
} else {
new_obj.insert(k.clone(), v.clone());
}
} else {
new_obj.insert(k.clone(), v.clone());
}
}
if modified {
result = serde_json::to_string(&new_obj).unwrap_or(result);
}
}
else if let Some(arr) = json_val.as_array() {
let mut modified = false;
let mut new_arr = Vec::new();
for item in arr {
if let Some(obj) = item.as_object() {
let mut new_obj = serde_json::Map::new();
for (k, v) in obj {
let should_mask = k == field
|| k.contains(field)
|| (v.is_string() && Self::is_base64(v.as_str().unwrap_or("")));
if should_mask {
new_obj.insert(k.clone(), serde_json::Value::String(replacement.to_string()));
modified = true;
} else {
new_obj.insert(k.clone(), v.clone());
}
}
new_arr.push(serde_json::Value::Object(new_obj));
} else {
new_arr.push(item.clone());
}
}
if modified {
result = serde_json::to_string(&new_arr).unwrap_or(result);
}
}
}
result
}
fn sanitize_json_arrays(value: &str, field: &str, replacement: &str) -> String {
let array_pattern = format!(r#"{{"{}","#, field);
if !value.contains(&array_pattern) {
return value.to_string();
}
if let Ok(json_val) = serde_json::from_str::<serde_json::Value>(value) {
if let Some(arr) = json_val.as_array() {
let mut modified = false;
let mut new_arr = Vec::new();
for item in arr {
if let Some(obj) = item.as_object() {
let mut new_obj = serde_json::Map::new();
for (k, v) in obj {
if k == field {
new_obj.insert(k.clone(), serde_json::Value::String(replacement.to_string()));
modified = true;
} else {
new_obj.insert(k.clone(), v.clone());
}
}
new_arr.push(serde_json::Value::Object(new_obj));
} else {
new_arr.push(item.clone());
}
}
if modified {
return serde_json::to_string(&new_arr).unwrap_or(value.to_string());
}
}
}
value.to_string()
}
fn is_base64(s: &str) -> bool {
if s.len() % 4 != 0 || s.is_empty() {
return false;
}
let valid_chars: std::collections::HashSet<char> =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
.chars()
.collect();
s.chars().all(|c| valid_chars.contains(&c) || c == '=')
}
fn should_alert(&self, event: &AuditEvent) -> bool {
if !self.config.enabled {
return false;
}
self.config.alert_operations.contains(&event.operation)
}
fn trigger_alert(&self, event: &AuditEvent) {
if let Some(callback) = &self.alert_callback {
callback(event);
}
let msg = format!(
"[AUDIT ALERT] {} - {} {} on {} by user {}",
event.severity, event.operation, event.entity_id, event.entity_type, event.user_id
);
tracing::warn!("{}", msg);
}
}
#[derive(Debug, Default, Clone)]
pub struct AuditContext {
pub user_id: String,
pub user_role: String,
pub client_ip: String,
pub request_id: String,
pub session_id: String,
}
impl AuditContext {
pub fn new(user_id: &str, role: &str, client_ip: &str) -> Self {
Self {
user_id: user_id.to_string(),
user_role: role.to_string(),
client_ip: client_ip.to_string(),
request_id: Uuid::new_v4().to_string(),
session_id: String::new(),
}
}
pub fn with_request_id(mut self, request_id: &str) -> Self {
self.request_id = request_id.to_string();
self
}
pub fn with_session_id(mut self, session_id: &str) -> Self {
self.session_id = session_id.to_string();
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
#[tokio::test]
async fn test_audit_event_creation() {
let event = AuditEvent::create("users", "1", "admin");
assert_eq!(event.operation, AuditOperation::Create);
assert_eq!(event.entity_type, "users");
assert_eq!(event.entity_id, "1");
assert_eq!(event.user_id, "admin");
}
#[tokio::test]
async fn test_audit_event_update() {
let before = r#"{"name": "old"}"#;
let after = r#"{"name": "new"}"#;
let event = AuditEvent::update("users", "1", "admin", Some(before.to_string()), Some(after.to_string()));
assert_eq!(event.operation, AuditOperation::Update);
assert_eq!(event.before_value, Some(before.to_string()));
assert_eq!(event.after_value, Some(after.to_string()));
}
#[tokio::test]
async fn test_audit_event_setters_and_default_storage() {
let event = AuditEvent::create("users", "1", "admin")
.with_user("role", "127.0.0.1")
.with_result(AuditResult::Failure)
.with_severity(AuditSeverity::High)
.with_extra("x")
.with_before_value("b")
.with_after_value("a")
.with_request_id("r")
.with_session_id("s");
assert_eq!(event.user_role, "role");
assert_eq!(event.client_ip, "127.0.0.1");
assert_eq!(event.result, AuditResult::Failure);
assert_eq!(event.severity, AuditSeverity::High);
assert_eq!(event.extra.as_deref(), Some("x"));
assert_eq!(event.before_value.as_deref(), Some("b"));
assert_eq!(event.after_value.as_deref(), Some("a"));
assert_eq!(event.request_id, "r");
assert_eq!(event.session_id, "s");
let storage = MemoryAuditStorage::default();
storage.store(&event).await.expect("Storage operation should succeed");
assert_eq!(storage.event_count().await, 1);
}
#[test]
fn test_audit_event_json_roundtrip() {
let event = AuditEvent::create("users", "1", "admin")
.with_user("role", "127.0.0.1")
.with_result(AuditResult::Success)
.with_severity(AuditSeverity::Medium)
.with_extra("x")
.with_before_value("b")
.with_after_value("a")
.with_request_id("r")
.with_session_id("s");
let json = event.to_json().unwrap();
let parsed = AuditEvent::from_json(&json).unwrap();
assert_eq!(parsed.operation, event.operation);
assert_eq!(parsed.entity_type, event.entity_type);
assert_eq!(parsed.entity_id, event.entity_id);
assert_eq!(parsed.user_id, event.user_id);
assert_eq!(parsed.user_role, event.user_role);
assert_eq!(parsed.client_ip, event.client_ip);
assert_eq!(parsed.result, event.result);
assert_eq!(parsed.severity, event.severity);
assert_eq!(parsed.extra, event.extra);
assert_eq!(parsed.before_value, event.before_value);
assert_eq!(parsed.after_value, event.after_value);
assert_eq!(parsed.request_id, event.request_id);
assert_eq!(parsed.session_id, event.session_id);
}
#[tokio::test]
async fn test_audit_logger_helpers_and_alert_disabled() {
let storage = Arc::new(MemoryAuditStorage::new(10));
let logger = AuditLogger::new(
AuditConfig {
enabled: false,
alert_operations: vec![AuditOperation::Delete],
..Default::default()
},
storage.clone(),
);
logger.log_create("t", "1", "u", Some("v".to_string())).await.unwrap();
logger.log_read("t", "1", "u").await.unwrap();
logger
.log_update("t", "1", "u", Some("b".to_string()), Some("a".to_string()))
.await
.unwrap();
logger.log_delete("t", "1", "u", None).await.unwrap();
assert_eq!(storage.event_count().await, 0);
assert!(!logger.should_alert(&AuditEvent::delete("t", "1", "u")));
}
#[tokio::test]
async fn test_audit_log_create_none_branch_and_cleanup_success() {
let storage = Arc::new(MemoryAuditStorage::new(10));
let logger = AuditLogger::new(AuditConfig::default(), storage.clone());
logger.log_create("t", "1", "u", None).await.unwrap();
let mut old = AuditEvent::create("t", "2", "u");
old.timestamp = Utc::now() - chrono::Duration::days(2);
logger.log(old).await.unwrap();
let removed = logger.cleanup(1).await.unwrap();
assert_eq!(removed, 1);
assert_eq!(storage.event_count().await, 1);
}
#[tokio::test]
async fn test_audit_sanitize_base64_non_string_values() {
let storage = Arc::new(MemoryAuditStorage::new(10));
let logger = AuditLogger::new(AuditConfig::default(), storage);
let event = AuditEvent::create("t", "1", "u").with_after_value(r#"{"count":1,"name":"x"}"#);
logger.log(event).await.unwrap();
let results = logger.query(&AuditQueryFilters::default()).await.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].after_value.as_ref().unwrap().contains("count"));
}
#[tokio::test]
async fn test_audit_logger() {
let storage = Arc::new(MemoryAuditStorage::new(100));
let config = AuditConfig::default();
let logger = AuditLogger::new(config, storage);
let event = AuditEvent::create("users", "1", "admin");
logger.log(event).await.unwrap();
let filters = AuditQueryFilters {
entity_type: Some("users".to_string()),
..Default::default()
};
let results = logger.query(&filters).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].entity_type, "users");
}
#[tokio::test]
async fn test_audit_sanitization() {
let storage = Arc::new(MemoryAuditStorage::new(100));
let config = AuditConfig::default();
let logger = AuditLogger::new(config, storage);
let event =
AuditEvent::create("users", "1", "admin").with_after_value(r#"{"password": "secret123", "name": "test"}"#);
logger.log(event).await.unwrap();
let filters = AuditQueryFilters::default();
let results = logger.query(&filters).await.unwrap();
let after_value = results[0].after_value.as_ref().unwrap();
assert!(after_value.contains("***REDACTED_PASSWORD***"));
assert!(after_value.contains("name"));
}
#[tokio::test]
async fn test_audit_context() {
let ctx = AuditContext::new("user123", "admin", "192.168.1.1");
assert_eq!(ctx.user_id, "user123");
assert_eq!(ctx.user_role, "admin");
assert_eq!(ctx.client_ip, "192.168.1.1");
assert!(!ctx.request_id.is_empty());
}
#[test]
fn test_audit_enum_display_and_defaults() {
assert_eq!(AuditOperation::Create.to_string(), "CREATE");
assert_eq!(AuditOperation::Read.to_string(), "READ");
assert_eq!(AuditOperation::Update.to_string(), "UPDATE");
assert_eq!(AuditOperation::Delete.to_string(), "DELETE");
assert_eq!(AuditOperation::Login.to_string(), "LOGIN");
assert_eq!(AuditOperation::Logout.to_string(), "LOGOUT");
assert_eq!(AuditOperation::PermissionChange.to_string(), "PERMISSION_CHANGE");
assert_eq!(AuditOperation::ConfigChange.to_string(), "CONFIG_CHANGE");
assert_eq!(AuditOperation::Other("custom_op".to_string()).to_string(), "CUSTOM_OP");
assert_eq!(AuditOperation::default().to_string(), "UNKNOWN");
assert_eq!(AuditSeverity::Info.to_string(), "INFO");
assert_eq!(AuditSeverity::Low.to_string(), "LOW");
assert_eq!(AuditSeverity::Medium.to_string(), "MEDIUM");
assert_eq!(AuditSeverity::High.to_string(), "HIGH");
assert_eq!(AuditSeverity::Critical.to_string(), "CRITICAL");
assert_eq!(AuditResult::Success.to_string(), "SUCCESS");
assert_eq!(AuditResult::Failure.to_string(), "FAILURE");
assert_eq!(AuditResult::Partial.to_string(), "PARTIAL");
assert_eq!(AuditResult::Unknown.to_string(), "UNKNOWN");
}
#[tokio::test]
async fn test_memory_storage_overflow_and_dropped_count() {
let storage = MemoryAuditStorage::new(1);
assert_eq!(storage.dropped_count(), 0);
let event1 = AuditEvent::create("users", "1", "admin");
let event2 = AuditEvent::create("users", "2", "admin");
storage.store(&event1).await.unwrap();
storage.store(&event2).await.unwrap();
assert_eq!(storage.event_count().await, 1);
assert_eq!(storage.dropped_count(), 1);
let results = storage.query(&AuditQueryFilters::default()).await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].entity_id, "2");
}
#[tokio::test]
async fn test_audit_query_filters_all_fields_and_cleanup() {
let storage = Arc::new(MemoryAuditStorage::new(100));
let logger = AuditLogger::new(AuditConfig::default(), storage.clone());
let now = Utc::now();
let mut e1 = AuditEvent::create("users", "1", "u1")
.with_user("admin", "10.0.0.1")
.with_severity(AuditSeverity::Low)
.with_result(AuditResult::Success)
.with_request_id("r1")
.with_session_id("s1");
e1.timestamp = now - chrono::Duration::minutes(10);
let mut e2 = AuditEvent::delete("orders", "9", "u2")
.with_user("system", "10.0.0.2")
.with_severity(AuditSeverity::High)
.with_result(AuditResult::Failure);
e2.timestamp = now;
logger.log(e1.clone()).await.unwrap();
logger.log(e2.clone()).await.unwrap();
let filters = AuditQueryFilters {
user_id: Some("u2".to_string()),
entity_type: Some("orders".to_string()),
operation: Some(AuditOperation::Delete),
start_time: Some(now - chrono::Duration::minutes(5)),
end_time: Some(now + chrono::Duration::minutes(1)),
severity: Some(AuditSeverity::High),
result: Some(AuditResult::Failure),
};
let filtered = logger.query(&filters).await.unwrap();
assert_eq!(filtered.len(), 1);
assert_eq!(filtered[0].entity_id, "9");
let removed = storage.cleanup(&(now - chrono::Duration::minutes(1))).await.unwrap();
assert_eq!(removed, 1);
assert_eq!(storage.event_count().await, 1);
}
#[tokio::test]
async fn test_audit_logger_disabled_and_alert_callback() {
let storage = Arc::new(MemoryAuditStorage::new(100));
let disabled_logger = AuditLogger::new(
AuditConfig {
enabled: false,
..Default::default()
},
storage.clone(),
);
disabled_logger
.log(AuditEvent::create("users", "1", "admin"))
.await
.unwrap();
assert_eq!(storage.event_count().await, 0);
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
let mut logger = AuditLogger::with_default_storage();
logger.set_alert_callback(move |_event| {
called_clone.store(true, Ordering::SeqCst);
});
logger
.log_delete("users", "2", "admin", Some(r#"{\"password\":\"x\"}"#.to_string()))
.await
.unwrap();
assert!(called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_audit_sanitization_base64_and_nested_field() {
let storage = Arc::new(MemoryAuditStorage::new(100));
let mut config = AuditConfig::default();
config.sensitive_fields.push("user.password".to_string());
let logger = AuditLogger::new(config, storage);
let after_value = r#"{"password":"p","_password":"p2","data":"c2VjcmV0","user.password":"v"}"#;
let event = AuditEvent::create("users", "1", "admin").with_after_value(after_value);
logger.log(event).await.unwrap();
let results = logger.query(&AuditQueryFilters::default()).await.unwrap();
let stored = results[0].after_value.as_ref().unwrap();
assert!(stored.contains("***REDACTED_PASSWORD***"));
assert!(stored.contains("_password_redacted"));
assert!(stored.contains(r#""data":"***REDACTED_PASSWORD***""#));
assert!(stored.contains("***REDACTED_USER.PASSWORD***"));
assert!(!AuditLogger::is_base64(""));
assert!(!AuditLogger::is_base64("abc"));
assert!(!AuditLogger::is_base64("!!!!"));
assert!(AuditLogger::is_base64("c2VjcmV0"));
}
#[tokio::test]
async fn test_audit_logger_cleanup_invalid_date_calculation() {
let storage = Arc::new(MemoryAuditStorage::new(100));
let logger = AuditLogger::new(AuditConfig::default(), storage);
let result = logger.cleanup(i64::MAX).await;
assert!(result.is_err());
}
#[test]
fn test_audit_context_setters() {
let ctx = AuditContext::new("u", "r", "ip")
.with_request_id("req")
.with_session_id("sess");
assert_eq!(ctx.request_id, "req");
assert_eq!(ctx.session_id, "sess");
}
}
#[derive(Debug, Default)]
pub struct AuditEventBuilder {
operation: Option<AuditOperation>,
entity_type: Option<String>,
entity_id: Option<String>,
user_id: Option<String>,
user_role: Option<String>,
client_ip: Option<String>,
severity: AuditSeverity,
result: AuditResult,
before_value: Option<String>,
after_value: Option<String>,
extra: Option<String>,
request_id: Option<String>,
session_id: Option<String>,
}
impl AuditEventBuilder {
pub fn new() -> Self {
Self {
operation: None,
entity_type: None,
entity_id: None,
user_id: None,
user_role: None,
client_ip: None,
severity: AuditSeverity::Info,
result: AuditResult::Success,
before_value: None,
after_value: None,
extra: None,
request_id: None,
session_id: None,
}
}
pub fn operation(mut self, operation: AuditOperation) -> Self {
self.operation = Some(operation);
self
}
pub fn entity_type(mut self, entity_type: &str) -> Self {
self.entity_type = Some(entity_type.to_string());
self
}
pub fn entity_id(mut self, entity_id: &str) -> Self {
self.entity_id = Some(entity_id.to_string());
self
}
pub fn user_id(mut self, user_id: &str) -> Self {
self.user_id = Some(user_id.to_string());
self
}
pub fn user_role(mut self, user_role: &str) -> Self {
self.user_role = Some(user_role.to_string());
self
}
pub fn client_ip(mut self, client_ip: &str) -> Self {
self.client_ip = Some(client_ip.to_string());
self
}
pub fn severity(mut self, severity: AuditSeverity) -> Self {
self.severity = severity;
self
}
pub fn result(mut self, result: AuditResult) -> Self {
self.result = result;
self
}
pub fn before_value(mut self, value: &str) -> Self {
self.before_value = Some(value.to_string());
self
}
pub fn after_value(mut self, value: &str) -> Self {
self.after_value = Some(value.to_string());
self
}
pub fn extra(mut self, value: &str) -> Self {
self.extra = Some(value.to_string());
self
}
pub fn request_id(mut self, request_id: &str) -> Self {
self.request_id = Some(request_id.to_string());
self
}
pub fn session_id(mut self, session_id: &str) -> Self {
self.session_id = Some(session_id.to_string());
self
}
pub fn build(self) -> AuditEvent {
AuditEvent {
id: Uuid::new_v4().to_string(),
timestamp: Utc::now(),
operation: self
.operation
.unwrap_or_else(|| panic!("AuditEventBuilder: operation is required")),
entity_type: self
.entity_type
.unwrap_or_else(|| panic!("AuditEventBuilder: entity_type is required")),
entity_id: self
.entity_id
.unwrap_or_else(|| panic!("AuditEventBuilder: entity_id is required")),
user_id: self.user_id.unwrap_or_default(),
user_role: self.user_role.unwrap_or_default(),
client_ip: self.client_ip.unwrap_or_default(),
severity: self.severity,
result: self.result,
before_value: self.before_value,
after_value: self.after_value,
extra: self.extra,
request_id: self.request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
session_id: self.session_id.unwrap_or_default(),
}
}
}