use serde_json::{Map, Value};
use crate::sql::sqlx::{self, postgres::PgRow, PgPool, Row};
#[derive(Debug, Clone)]
pub enum AuditSource {
System,
User { id: String },
Custom(String),
}
impl AuditSource {
#[must_use]
pub fn as_token(&self) -> String {
match self {
Self::System => "system".to_owned(),
Self::User { id } => format!("user:{id}"),
Self::Custom(s) => s.clone(),
}
}
}
impl Default for AuditSource {
fn default() -> Self {
Self::System
}
}
tokio::task_local! {
pub static AUDIT_SOURCE: AuditSource;
}
#[must_use]
pub fn current_source() -> AuditSource {
AUDIT_SOURCE
.try_with(Clone::clone)
.unwrap_or(AuditSource::System)
}
pub async fn with_source<F, T>(source: AuditSource, fut: F) -> T
where
F: std::future::Future<Output = T>,
{
AUDIT_SOURCE.scope(source, fut).await
}
#[derive(Debug, Clone)]
pub struct PendingEntry {
pub entity_table: &'static str,
pub entity_pk: String,
pub operation: AuditOp,
pub source: AuditSource,
pub changes: Value,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditOp {
Create,
Update,
Delete,
SoftDelete,
Restore,
}
impl AuditOp {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Create => "create",
Self::Update => "update",
Self::Delete => "delete",
Self::SoftDelete => "soft_delete",
Self::Restore => "restore",
}
}
}
pub async fn emit_one<'c, E>(executor: E, entry: &PendingEntry) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES ($1, $2, $3, $4, $5)"#,
)
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(&entry.changes)
.execute(executor)
.await?;
Ok(())
}
pub async fn emit_many<'c, E>(
executor: E,
entries: &[PendingEntry],
) -> Result<(), sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
if entries.is_empty() {
return Ok(());
}
let mut sql = String::from(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES "#,
);
let mut bind_idx = 1usize;
for (i, _) in entries.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
use std::fmt::Write as _;
let _ = write!(
sql,
"(${}, ${}, ${}, ${}, ${})",
bind_idx,
bind_idx + 1,
bind_idx + 2,
bind_idx + 3,
bind_idx + 4,
);
bind_idx += 5;
}
let mut q = sqlx::query(&sql);
for entry in entries {
q = q
.bind(entry.entity_table)
.bind(&entry.entity_pk)
.bind(entry.operation.as_str())
.bind(entry.source.as_token())
.bind(&entry.changes);
}
q.execute(executor).await?;
Ok(())
}
#[must_use]
pub fn diff_changes(before: &[(&str, Value)], after: &[(&str, Value)]) -> Value {
let mut out = Map::new();
for (name, after_val) in after {
let before_val = before
.iter()
.find(|(n, _)| n == name)
.map(|(_, v)| v.clone())
.unwrap_or(Value::Null);
if &before_val != after_val {
let mut entry = Map::new();
entry.insert("before".into(), before_val);
entry.insert("after".into(), after_val.clone());
out.insert((*name).into(), Value::Object(entry));
}
}
Value::Object(out)
}
#[must_use]
pub fn snapshot_changes(after: &[(&str, Value)]) -> Value {
let mut out = Map::new();
for (name, val) in after {
out.insert((*name).to_string(), val.clone());
}
Value::Object(out)
}
pub async fn fetch_for_entity(
pool: &PgPool,
entity_table: &str,
entity_pk: &str,
) -> Result<Vec<AuditEntry>, sqlx::Error> {
let rows: Vec<PgRow> = sqlx::query(
r#"SELECT "id", "entity_table", "entity_pk", "operation",
"source", "changes", "occurred_at"
FROM "rustango_audit_log"
WHERE "entity_table" = $1 AND "entity_pk" = $2
ORDER BY "occurred_at" DESC, "id" DESC"#,
)
.bind(entity_table)
.bind(entity_pk)
.fetch_all(pool)
.await?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
out.push(AuditEntry::from_row(&row)?);
}
Ok(out)
}
#[derive(Debug, Clone)]
pub struct AuditEntry {
pub id: i64,
pub entity_table: String,
pub entity_pk: String,
pub operation: String,
pub source: String,
pub changes: Value,
pub occurred_at: chrono::DateTime<chrono::Utc>,
}
impl AuditEntry {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
Ok(Self {
id: row.try_get("id")?,
entity_table: row.try_get("entity_table")?,
entity_pk: row.try_get("entity_pk")?,
operation: row.try_get("operation")?,
source: row.try_get("source")?,
changes: row.try_get("changes")?,
occurred_at: row.try_get("occurred_at")?,
})
}
}
pub const CREATE_TABLE_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS "rustango_audit_log" (
"id" BIGSERIAL PRIMARY KEY,
"entity_table" TEXT NOT NULL,
"entity_pk" TEXT NOT NULL,
"operation" TEXT NOT NULL,
"source" TEXT NOT NULL,
"changes" JSONB NOT NULL,
"occurred_at" TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS "rustango_audit_log_entity_idx"
ON "rustango_audit_log" ("entity_table", "entity_pk");
CREATE INDEX IF NOT EXISTS "rustango_audit_log_occurred_idx"
ON "rustango_audit_log" ("occurred_at" DESC);
"#;
pub async fn cleanup_older_than(
pool: &PgPool,
cutoff_days: i64,
) -> Result<u64, sqlx::Error> {
let cutoff = cutoff_days.max(0);
let result = sqlx::query(
r#"DELETE FROM "rustango_audit_log"
WHERE "occurred_at" < NOW() - ($1::int8 * INTERVAL '1 day')"#,
)
.bind(cutoff)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
pub async fn cleanup_keep_last_n(
pool: &PgPool,
keep: i64,
) -> Result<u64, sqlx::Error> {
let keep = keep.max(0);
let result = sqlx::query(
r#"DELETE FROM "rustango_audit_log" WHERE "id" IN (
SELECT "id" FROM (
SELECT "id",
ROW_NUMBER() OVER (
PARTITION BY "entity_table", "entity_pk"
ORDER BY "occurred_at" DESC, "id" DESC
) AS _rn
FROM "rustango_audit_log"
) ranked
WHERE _rn > $1
)"#,
)
.bind(keep)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
pub async fn ensure_table(pool: &PgPool) -> Result<(), sqlx::Error> {
for stmt in CREATE_TABLE_SQL.split(';') {
let trimmed = stmt.trim();
if trimmed.is_empty() {
continue;
}
sqlx::query(trimmed).execute(pool).await?;
}
Ok(())
}