use chrono::{DateTime, Utc};
use sqlx::Row as _;
use crate::error::{Error, Result};
use crate::orm::Db;
pub(crate) const CREATE_TABLE_SQL: &str = "CREATE TABLE IF NOT EXISTS rustio_admin_actions (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES rustio_users(id) ON DELETE CASCADE,
action_type TEXT NOT NULL,
model_name TEXT NOT NULL,
object_id BIGINT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
ip_address TEXT,
summary TEXT NOT NULL DEFAULT ''
)";
pub(crate) const CREATE_MODEL_INDEX_SQL: &str =
"CREATE INDEX IF NOT EXISTS rustio_admin_actions_model_idx \
ON rustio_admin_actions(model_name, object_id)";
pub(crate) const CREATE_TIMESTAMP_INDEX_SQL: &str =
"CREATE INDEX IF NOT EXISTS rustio_admin_actions_timestamp_idx \
ON rustio_admin_actions(timestamp DESC)";
pub async fn ensure_table(db: &Db) -> Result<()> {
sqlx::query(CREATE_TABLE_SQL).execute(db.pool()).await?;
sqlx::query(CREATE_MODEL_INDEX_SQL)
.execute(db.pool())
.await?;
sqlx::query(CREATE_TIMESTAMP_INDEX_SQL)
.execute(db.pool())
.await?;
sqlx::query("ALTER TABLE rustio_admin_actions ADD COLUMN IF NOT EXISTS metadata JSONB")
.execute(db.pool())
.await?;
sqlx::query("ALTER TABLE rustio_admin_actions ADD COLUMN IF NOT EXISTS correlation_id TEXT")
.execute(db.pool())
.await?;
sqlx::query("ALTER TABLE rustio_admin_actions ADD COLUMN IF NOT EXISTS session_id BIGINT")
.execute(db.pool())
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS rustio_admin_actions_correlation_idx \
ON rustio_admin_actions (correlation_id) WHERE correlation_id IS NOT NULL",
)
.execute(db.pool())
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS rustio_admin_actions_session_idx \
ON rustio_admin_actions (session_id) WHERE session_id IS NOT NULL",
)
.execute(db.pool())
.await?;
sqlx::query(
"UPDATE rustio_admin_actions \
SET model_name = 'users' \
WHERE model_name IN ('User', 'user', 'rustio_users')",
)
.execute(db.pool())
.await?;
sqlx::query(
"UPDATE rustio_admin_actions \
SET model_name = 'groups' \
WHERE model_name IN ('Group', 'group', 'rustio_groups')",
)
.execute(db.pool())
.await?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ActionType {
Create,
Update,
Delete,
}
impl ActionType {
pub fn as_str(self) -> &'static str {
match self {
Self::Create => "create",
Self::Update => "update",
Self::Delete => "delete",
}
}
pub fn parse(s: &str) -> Option<Self> {
match s {
"create" => Some(Self::Create),
"update" => Some(Self::Update),
"delete" => Some(Self::Delete),
_ => None,
}
}
pub fn label(self) -> &'static str {
match self {
Self::Create => "Created",
Self::Update => "Updated",
Self::Delete => "Deleted",
}
}
pub fn pill_class(self) -> &'static str {
match self {
Self::Create => "badge-success",
Self::Update => "badge-neutral",
Self::Delete => "badge-danger",
}
}
}
#[derive(Debug, Clone)]
pub struct AdminAction {
pub id: i64,
pub user_id: i64,
pub user_email: Option<String>,
pub action_type: String,
pub model_name: String,
pub object_id: i64,
pub timestamp: DateTime<Utc>,
pub ip_address: Option<String>,
pub summary: String,
pub metadata: Option<serde_json::Value>,
}
pub struct LogEntry<'a> {
pub user_id: i64,
pub action_type: ActionType,
pub model_name: &'a str,
pub object_id: i64,
pub ip_address: Option<&'a str>,
pub summary: String,
pub correlation_id: Option<&'a str>,
pub session_id: Option<i64>,
pub metadata: Option<serde_json::Value>,
pub actor_user_id: Option<i64>,
pub event: Option<AuditEvent>,
}
impl<'a> LogEntry<'a> {
pub fn new(user_id: i64, action_type: ActionType, model_name: &'a str, object_id: i64) -> Self {
Self {
user_id,
action_type,
model_name,
object_id,
ip_address: None,
summary: String::new(),
correlation_id: None,
session_id: None,
metadata: None,
actor_user_id: None,
event: None,
}
}
pub fn with_actor(mut self, actor_user_id: i64) -> Self {
self.actor_user_id = Some(actor_user_id);
self
}
pub fn with_event(mut self, event: AuditEvent) -> Self {
self.event = Some(event);
self
}
pub(crate) fn resolved_action_type(&self) -> &'static str {
match self.event {
Some(e) => e.as_str(),
None => self.action_type.as_str(),
}
}
}
fn build_persisted_metadata(
metadata: Option<serde_json::Value>,
actor_user_id: Option<i64>,
) -> Option<serde_json::Value> {
let actor = match actor_user_id {
None => return metadata,
Some(id) => id,
};
match metadata {
None => Some(serde_json::json!({ "actor_user_id": actor })),
Some(mut value) => {
if let Some(obj) = value.as_object_mut() {
obj.insert("actor_user_id".to_string(), serde_json::json!(actor));
Some(value)
} else {
log::warn!(
"audit::record: actor_user_id={actor} set but metadata is not a JSON object \
({value:?}); writing row without merging actor — fix the call site"
);
Some(value)
}
}
}
}
pub async fn record(db: &Db, entry: LogEntry<'_>) -> Result<()> {
if entry.user_id <= 0 {
return Err(Error::Internal("admin audit: missing user_id".to_string()));
}
if entry.model_name.trim().is_empty() {
return Err(Error::Internal(
"admin audit: missing model_name".to_string(),
));
}
if entry.object_id < 0 {
return Err(Error::Internal(
"admin audit: object_id must be >= 0 (bulk rows use 0)".to_string(),
));
}
let now = Utc::now();
let action_type_str = entry.resolved_action_type();
let metadata = build_persisted_metadata(entry.metadata, entry.actor_user_id);
sqlx::query(
"INSERT INTO rustio_admin_actions
(user_id, action_type, model_name, object_id, timestamp, ip_address, summary,
correlation_id, session_id, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
)
.bind(entry.user_id)
.bind(action_type_str)
.bind(entry.model_name)
.bind(entry.object_id)
.bind(now)
.bind(entry.ip_address)
.bind(&entry.summary)
.bind(entry.correlation_id)
.bind(entry.session_id)
.bind(metadata.as_ref())
.execute(db.pool())
.await?;
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum AuditEvent {
UserCreated,
UserUpdated,
UserDeleted,
GroupCreated,
GroupUpdated,
GroupDeleted,
PasswordChangedSelf,
PasswordResetSelfRequest,
PasswordResetSelfConsume,
PasswordResetByOther,
ForcedPasswordChangeCompleted,
AccountLocked,
AccountUnlocked,
MfaEnabled,
MfaDisabled,
MfaResetByOther,
MfaCodeConsumed,
BackupCodesRegenerated,
SessionsRevokedSelf,
SessionsRevokedByOther,
SessionLogout,
LoginSucceeded,
LoginFailed,
EmergencyRecovery,
}
impl AuditEvent {
pub const fn as_str(self) -> &'static str {
match self {
Self::UserCreated => "user_created",
Self::UserUpdated => "user_updated",
Self::UserDeleted => "user_deleted",
Self::GroupCreated => "group_created",
Self::GroupUpdated => "group_updated",
Self::GroupDeleted => "group_deleted",
Self::PasswordChangedSelf => "password_changed_self",
Self::PasswordResetSelfRequest => "password_reset_self_request",
Self::PasswordResetSelfConsume => "password_reset_self_consume",
Self::PasswordResetByOther => "password_reset_by_other",
Self::ForcedPasswordChangeCompleted => "forced_password_change_completed",
Self::AccountLocked => "account_locked",
Self::AccountUnlocked => "account_unlocked",
Self::MfaEnabled => "mfa_enabled",
Self::MfaDisabled => "mfa_disabled",
Self::MfaResetByOther => "mfa_reset_by_other",
Self::MfaCodeConsumed => "mfa_code_consumed",
Self::BackupCodesRegenerated => "backup_codes_regenerated",
Self::SessionsRevokedSelf => "sessions_revoked_self",
Self::SessionsRevokedByOther => "sessions_revoked_by_other",
Self::SessionLogout => "session_logout",
Self::LoginSucceeded => "login_succeeded",
Self::LoginFailed => "login_failed",
Self::EmergencyRecovery => "emergency_recovery",
}
}
}
pub async fn recent(
db: &Db,
limit: i64,
model_filter: Option<&str>,
action_filter: Option<&str>,
user_filter: Option<i64>,
) -> Result<Vec<AdminAction>> {
let mut sql = String::from(
"SELECT a.id, a.user_id, u.email AS user_email, a.action_type,
a.model_name, a.object_id, a.timestamp, a.ip_address, a.summary,
a.metadata
FROM rustio_admin_actions a
LEFT JOIN rustio_users u ON u.id = a.user_id",
);
let mut clauses: Vec<String> = Vec::new();
let mut param_idx: usize = 1;
if model_filter.is_some() {
clauses.push(format!("a.model_name = ${param_idx}"));
param_idx += 1;
}
if action_filter.is_some() {
clauses.push(format!("a.action_type = ${param_idx}"));
param_idx += 1;
}
if user_filter.is_some() {
clauses.push(format!("a.user_id = ${param_idx}"));
param_idx += 1;
}
if !clauses.is_empty() {
sql.push_str(" WHERE ");
sql.push_str(&clauses.join(" AND "));
}
sql.push_str(&format!(
" ORDER BY a.timestamp DESC, a.id DESC LIMIT ${param_idx}"
));
let mut q = sqlx::query(&sql);
if let Some(m) = model_filter {
q = q.bind(m);
}
if let Some(a) = action_filter {
q = q.bind(a);
}
if let Some(u) = user_filter {
q = q.bind(u);
}
q = q.bind(limit);
let rows = q.fetch_all(db.pool()).await?;
rows.iter().map(row_to_action).collect()
}
pub async fn for_object(db: &Db, model_name: &str, object_id: i64) -> Result<Vec<AdminAction>> {
let rows = sqlx::query(
"SELECT a.id, a.user_id, u.email AS user_email, a.action_type,
a.model_name, a.object_id, a.timestamp, a.ip_address, a.summary,
a.metadata
FROM rustio_admin_actions a
LEFT JOIN rustio_users u ON u.id = a.user_id
WHERE a.model_name = $1 AND a.object_id = $2
ORDER BY a.timestamp DESC, a.id DESC",
)
.bind(model_name)
.bind(object_id)
.fetch_all(db.pool())
.await?;
rows.iter().map(row_to_action).collect()
}
fn row_to_action(r: &sqlx::postgres::PgRow) -> Result<AdminAction> {
Ok(AdminAction {
id: r.try_get("id")?,
user_id: r.try_get("user_id")?,
user_email: r.try_get("user_email")?,
action_type: r.try_get("action_type")?,
model_name: r.try_get("model_name")?,
object_id: r.try_get("object_id")?,
timestamp: r.try_get("timestamp")?,
ip_address: r.try_get("ip_address")?,
summary: r.try_get("summary")?,
metadata: r.try_get("metadata").ok(),
})
}
#[cfg(test)]
mod tests {
use super::*;
const ALL_AUDIT_EVENTS: &[AuditEvent] = &[
AuditEvent::UserCreated,
AuditEvent::UserUpdated,
AuditEvent::UserDeleted,
AuditEvent::GroupCreated,
AuditEvent::GroupUpdated,
AuditEvent::GroupDeleted,
AuditEvent::PasswordChangedSelf,
AuditEvent::PasswordResetSelfRequest,
AuditEvent::PasswordResetSelfConsume,
AuditEvent::PasswordResetByOther,
AuditEvent::ForcedPasswordChangeCompleted,
AuditEvent::AccountLocked,
AuditEvent::AccountUnlocked,
AuditEvent::MfaEnabled,
AuditEvent::MfaDisabled,
AuditEvent::MfaResetByOther,
AuditEvent::MfaCodeConsumed,
AuditEvent::BackupCodesRegenerated,
AuditEvent::SessionsRevokedSelf,
AuditEvent::SessionsRevokedByOther,
AuditEvent::SessionLogout,
AuditEvent::LoginSucceeded,
AuditEvent::LoginFailed,
AuditEvent::EmergencyRecovery,
];
#[test]
fn audit_event_strings_are_unique() {
let mut set = std::collections::HashSet::new();
for &e in ALL_AUDIT_EVENTS {
assert!(set.insert(e.as_str()), "duplicate as_str() for {e:?}");
}
assert_eq!(set.len(), ALL_AUDIT_EVENTS.len());
}
#[test]
fn audit_event_strings_are_snake_case() {
for &e in ALL_AUDIT_EVENTS {
let s = e.as_str();
assert!(!s.is_empty(), "{e:?} as_str is empty");
assert!(
s.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_'),
"{e:?}.as_str() = {s:?} is not snake_case"
);
}
}
#[test]
fn audit_event_password_changed_self_maps_correctly() {
assert_eq!(
AuditEvent::PasswordChangedSelf.as_str(),
"password_changed_self"
);
}
#[test]
fn audit_event_existing_variants_have_stable_strings() {
assert_eq!(AuditEvent::UserCreated.as_str(), "user_created");
assert_eq!(AuditEvent::UserUpdated.as_str(), "user_updated");
assert_eq!(AuditEvent::UserDeleted.as_str(), "user_deleted");
assert_eq!(AuditEvent::GroupCreated.as_str(), "group_created");
assert_eq!(AuditEvent::GroupUpdated.as_str(), "group_updated");
assert_eq!(AuditEvent::GroupDeleted.as_str(), "group_deleted");
assert_eq!(
AuditEvent::PasswordChangedSelf.as_str(),
"password_changed_self"
);
assert_eq!(
AuditEvent::PasswordResetSelfRequest.as_str(),
"password_reset_self_request"
);
assert_eq!(
AuditEvent::PasswordResetSelfConsume.as_str(),
"password_reset_self_consume"
);
assert_eq!(
AuditEvent::PasswordResetByOther.as_str(),
"password_reset_by_other"
);
assert_eq!(
AuditEvent::ForcedPasswordChangeCompleted.as_str(),
"forced_password_change_completed"
);
assert_eq!(AuditEvent::AccountLocked.as_str(), "account_locked");
assert_eq!(AuditEvent::AccountUnlocked.as_str(), "account_unlocked");
assert_eq!(AuditEvent::MfaEnabled.as_str(), "mfa_enabled");
assert_eq!(AuditEvent::MfaDisabled.as_str(), "mfa_disabled");
assert_eq!(AuditEvent::MfaResetByOther.as_str(), "mfa_reset_by_other");
assert_eq!(AuditEvent::MfaCodeConsumed.as_str(), "mfa_code_consumed");
assert_eq!(
AuditEvent::BackupCodesRegenerated.as_str(),
"backup_codes_regenerated"
);
assert_eq!(
AuditEvent::SessionsRevokedSelf.as_str(),
"sessions_revoked_self"
);
assert_eq!(
AuditEvent::SessionsRevokedByOther.as_str(),
"sessions_revoked_by_other"
);
assert_eq!(AuditEvent::SessionLogout.as_str(), "session_logout");
assert_eq!(AuditEvent::LoginSucceeded.as_str(), "login_succeeded");
assert_eq!(AuditEvent::LoginFailed.as_str(), "login_failed");
assert_eq!(AuditEvent::EmergencyRecovery.as_str(), "emergency_recovery");
}
#[test]
fn action_type_and_audit_event_vocabularies_dont_collide() {
let action_type_strs = [
ActionType::Create.as_str(),
ActionType::Update.as_str(),
ActionType::Delete.as_str(),
];
let mut set = std::collections::HashSet::new();
for s in action_type_strs {
assert!(set.insert(s), "duplicate ActionType string {s:?}");
}
for &e in ALL_AUDIT_EVENTS {
assert!(
set.insert(e.as_str()),
"AuditEvent::{:?} ({:?}) collides with ActionType",
e,
e.as_str()
);
}
assert_eq!(set.len(), action_type_strs.len() + ALL_AUDIT_EVENTS.len());
}
#[test]
fn emergency_recovery_is_cli_only() {
let framework_src = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src");
let self_file = framework_src.join("admin/audit.rs");
let mut offenders: Vec<String> = Vec::new();
walk_rs_files_admin_audit(&framework_src, &mut |path: &std::path::Path| {
if path == self_file {
return;
}
let content = std::fs::read_to_string(path)
.unwrap_or_else(|e| panic!("read {}: {e}", path.display()));
for (line_no, raw_line) in content.lines().enumerate() {
let code = match raw_line.find("//") {
Some(idx) => &raw_line[..idx],
None => raw_line,
};
if code.contains("AuditEvent::EmergencyRecovery") {
offenders.push(format!(
"{}:{}: {}",
path.display(),
line_no + 1,
raw_line.trim()
));
}
}
});
assert!(
offenders.is_empty(),
"framework crate must not reference `AuditEvent::EmergencyRecovery` \
in CODE (it is a CLI-only audit variant per \
`DESIGN_R4_EMERGENCY.md` §10 D12). Move the emission to \
crates/rustio-admin-cli/, or introduce a new AuditEvent variant \
for the web-side action:\n {}",
offenders.join("\n ")
);
}
#[test]
fn model_name_uses_admin_slug_not_struct_name() {
let framework_src = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src");
let self_file = framework_src.join("admin/audit.rs");
let legacy_patterns: &[&str] = &[
r#""User""#,
r#""user""#,
r#""Group""#,
r#""group""#,
r#""rustio_users""#,
r#""rustio_groups""#,
];
let mut offenders: Vec<String> = Vec::new();
walk_rs_files_admin_audit(&framework_src, &mut |path: &std::path::Path| {
if path == self_file {
return;
}
let content = std::fs::read_to_string(path)
.unwrap_or_else(|e| panic!("read {}: {e}", path.display()));
for (line_no, raw_line) in content.lines().enumerate() {
let code = match raw_line.find("//") {
Some(idx) => &raw_line[..idx],
None => raw_line,
};
let looks_like_audit_emission = code.contains("LogEntry")
|| code.contains("model_name:")
|| code.contains("model_name ==")
|| code.contains("model_name =");
if !looks_like_audit_emission {
continue;
}
for pat in legacy_patterns {
if code.contains(pat) {
offenders.push(format!(
"{}:{}: {}",
path.display(),
line_no + 1,
raw_line.trim()
));
break;
}
}
}
});
assert!(
offenders.is_empty(),
"audit row emissions must write the admin slug (not struct \
name / SQL table name) as `model_name`. The History page \
renders this column as a URL slug; non-slug values 404. \
See `VISIBILITY_AUDIT.md` F1.\n {}",
offenders.join("\n ")
);
}
fn walk_rs_files_admin_audit(root: &std::path::Path, visit: &mut dyn FnMut(&std::path::Path)) {
let entries = match std::fs::read_dir(root) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
let file_type = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if file_type.is_dir() {
walk_rs_files_admin_audit(&path, visit);
} else if path.extension().and_then(|s| s.to_str()) == Some("rs") {
visit(&path);
}
}
}
#[test]
fn log_entry_with_event_overrides_action_type_persistence() {
let entry = LogEntry::new(1, ActionType::Update, "users", 1);
assert_eq!(entry.resolved_action_type(), "update");
let entry = LogEntry::new(1, ActionType::Update, "users", 1)
.with_event(AuditEvent::PasswordChangedSelf);
assert_eq!(entry.resolved_action_type(), "password_changed_self");
let entry = LogEntry::new(1, ActionType::Update, "users", 1)
.with_event(AuditEvent::PasswordResetSelfRequest);
assert_eq!(entry.resolved_action_type(), "password_reset_self_request");
let entry = LogEntry::new(1, ActionType::Update, "users", 1)
.with_event(AuditEvent::PasswordResetSelfConsume);
assert_eq!(entry.resolved_action_type(), "password_reset_self_consume");
}
#[test]
fn log_entry_default_event_is_none() {
let entry = LogEntry::new(1, ActionType::Create, "post", 99);
assert!(entry.event.is_none());
assert_eq!(entry.resolved_action_type(), "create");
}
#[test]
fn log_entry_with_actor_sets_field() {
let entry = LogEntry::new(1, ActionType::Update, "users", 1).with_actor(7);
assert_eq!(entry.actor_user_id, Some(7));
}
#[test]
fn log_entry_default_actor_user_id_is_none() {
let entry = LogEntry::new(1, ActionType::Update, "users", 1);
assert!(entry.actor_user_id.is_none());
}
#[test]
fn merge_returns_metadata_unchanged_when_no_actor() {
let original = serde_json::json!({"reason": "x", "actor_user_id": 99});
let out = build_persisted_metadata(Some(original.clone()), None);
assert_eq!(out.unwrap(), original);
assert!(build_persisted_metadata(None, None).is_none());
}
#[test]
fn merge_synthesizes_object_when_metadata_is_none() {
let out = build_persisted_metadata(None, Some(7)).unwrap();
assert_eq!(out, serde_json::json!({"actor_user_id": 7}));
}
#[test]
fn merge_inserts_into_existing_object() {
let input = serde_json::json!({"reason": "support ticket", "mode": "email"});
let out = build_persisted_metadata(Some(input), Some(7)).unwrap();
assert_eq!(
out,
serde_json::json!({
"reason": "support ticket",
"mode": "email",
"actor_user_id": 7
})
);
}
#[test]
fn merge_typed_actor_wins_over_existing_metadata_key() {
let input = serde_json::json!({"actor_user_id": 999, "extra": "x"});
let out = build_persisted_metadata(Some(input), Some(7)).unwrap();
assert_eq!(out, serde_json::json!({"actor_user_id": 7, "extra": "x"}));
}
#[test]
fn merge_passes_through_non_object_metadata_with_warning() {
let input = serde_json::json!(42);
let out = build_persisted_metadata(Some(input.clone()), Some(7)).unwrap();
assert_eq!(out, input);
let input = serde_json::json!(["a", "b"]);
let out = build_persisted_metadata(Some(input.clone()), Some(7)).unwrap();
assert_eq!(out, input);
let input = serde_json::json!("scalar");
let out = build_persisted_metadata(Some(input.clone()), Some(7)).unwrap();
assert_eq!(out, input);
}
#[test]
fn legacy_action_type_parser_returns_none_on_unknown_strings() {
assert_eq!(ActionType::parse("create"), Some(ActionType::Create));
assert_eq!(ActionType::parse("update"), Some(ActionType::Update));
assert_eq!(ActionType::parse("delete"), Some(ActionType::Delete));
for &e in ALL_AUDIT_EVENTS {
assert!(
ActionType::parse(e.as_str()).is_none(),
"ActionType::parse should not recognise AuditEvent string {:?}",
e.as_str()
);
}
assert!(ActionType::parse("garbage").is_none());
assert!(ActionType::parse("").is_none());
assert!(ActionType::parse("CREATE").is_none()); }
}