mod redact;
mod schema;
use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
use crate::ModelDescriptor;
use crate::sqlx;
pub use redact::{primary_key_from_snapshot, redact_snapshot, snapshot_model};
pub use schema::AUDIT_TABLE_DDL;
pub(crate) use schema::ensure_audit_table;
pub(crate) async fn enqueue_audit_event<'e, E>(
executor: E,
event: &AuditEvent,
) -> Result<(), CoolError>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
let actor = serde_json::to_value(&event.actor)
.map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
sqlx::query(
"INSERT INTO cratestack_audit (\
event_id, schema_name, model, operation, primary_key, actor, \
tenant, before, after, request_id, occurred_at\
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
)
.bind(event.event_id)
.bind(&event.schema_name)
.bind(&event.model)
.bind(event.operation.as_str())
.bind(&event.primary_key)
.bind(actor)
.bind(event.tenant.as_deref())
.bind(event.before.as_ref())
.bind(event.after.as_ref())
.bind(event.request_id.as_deref())
.bind(event.occurred_at)
.execute(executor)
.await
.map(|_| ())
.map_err(|error| CoolError::Database(error.to_string()))
}
pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
AuditActor {
id: ctx.principal_actor_id().map(|s| s.to_owned()),
claims: ctx.audit_claims_snapshot(),
ip: ctx.client_ip().map(|s| s.to_owned()),
}
}
pub(crate) fn build_audit_event<M, PK>(
descriptor: &'static ModelDescriptor<M, PK>,
operation: AuditOperation,
before: Option<serde_json::Value>,
after: Option<serde_json::Value>,
ctx: &CoolContext,
) -> AuditEvent {
let primary_key = after
.as_ref()
.or(before.as_ref())
.map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
.unwrap_or(serde_json::Value::Null);
let before = before.map(|mut snapshot| {
redact_snapshot(
&mut snapshot,
descriptor.pii_columns,
descriptor.sensitive_columns,
);
snapshot
});
let after = after.map(|mut snapshot| {
redact_snapshot(
&mut snapshot,
descriptor.pii_columns,
descriptor.sensitive_columns,
);
snapshot
});
AuditEvent {
event_id: uuid::Uuid::new_v4(),
schema_name: String::new(),
model: descriptor.schema_name.to_owned(),
operation,
primary_key,
actor: actor_from_context(ctx),
tenant: ctx.tenant_id().map(|s| s.to_owned()),
before,
after,
request_id: ctx.request_id().map(|s| s.to_owned()),
occurred_at: chrono::Utc::now(),
}
}
pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
executor: E,
descriptor: &'static ModelDescriptor<M, PK>,
id: PK,
) -> Result<Option<M>, CoolError>
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
{
let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
query.push(descriptor.select_projection());
query.push(" FROM ").push(descriptor.table_name);
query
.push(" WHERE ")
.push(descriptor.primary_key)
.push(" = ");
query.push_bind(id);
query.push(" FOR UPDATE");
query
.build_query_as::<M>()
.fetch_optional(executor)
.await
.map_err(|error| CoolError::Database(error.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn audit_operation_string_is_lowercase() {
assert_eq!(AuditOperation::Create.as_str(), "create");
assert_eq!(AuditOperation::Update.as_str(), "update");
assert_eq!(AuditOperation::Delete.as_str(), "delete");
}
}