Skip to main content

cratestack_sqlx/
audit.rs

1//! Postgres-backed audit log. Audit rows write inside the mutation's
2//! transaction — you can never see a committed row whose audit entry
3//! didn't also commit. Fan-out (Kafka/Redis pubsub) goes through
4//! [`cratestack_core::AuditSink`]; this module is the canonical DB
5//! record.
6
7mod redact;
8mod schema;
9
10use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
11
12use crate::ModelDescriptor;
13use crate::sqlx;
14
15pub use redact::{primary_key_from_snapshot, redact_snapshot, snapshot_model};
16pub use schema::AUDIT_TABLE_DDL;
17pub(crate) use schema::ensure_audit_table;
18
19/// Persist an audit event into the `cratestack_audit` table. Designed
20/// to run inside the same transaction as the mutation it describes.
21pub(crate) async fn enqueue_audit_event<'e, E>(
22    executor: E,
23    event: &AuditEvent,
24) -> Result<(), CoolError>
25where
26    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
27{
28    let actor = serde_json::to_value(&event.actor)
29        .map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
30    sqlx::query(
31        "INSERT INTO cratestack_audit (\
32            event_id, schema_name, model, operation, primary_key, actor, \
33            tenant, before, after, request_id, occurred_at\
34         ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
35    )
36    .bind(event.event_id)
37    .bind(&event.schema_name)
38    .bind(&event.model)
39    .bind(event.operation.as_str())
40    .bind(&event.primary_key)
41    .bind(actor)
42    .bind(event.tenant.as_deref())
43    .bind(event.before.as_ref())
44    .bind(event.after.as_ref())
45    .bind(event.request_id.as_deref())
46    .bind(event.occurred_at)
47    .execute(executor)
48    .await
49    .map(|_| ())
50    .map_err(|error| CoolError::Database(error.to_string()))
51}
52
53/// Derive an [`AuditActor`] from the [`CoolContext`] active at
54/// mutation time. Banks generally want the principal's id, primary
55/// claims, and source IP if the transport recorded one.
56pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
57    AuditActor {
58        id: ctx.principal_actor_id().map(|s| s.to_owned()),
59        claims: ctx.audit_claims_snapshot(),
60        ip: ctx.client_ip().map(|s| s.to_owned()),
61    }
62}
63
64/// Build an `AuditEvent` for a mutation just performed in a tx. The
65/// caller passes the JSON snapshot(s) on hand (`before` for
66/// updates/deletes, `after` for creates/updates) so this helper stays
67/// decoupled from the SQL-row decoding path.
68pub(crate) fn build_audit_event<M, PK>(
69    descriptor: &'static ModelDescriptor<M, PK>,
70    operation: AuditOperation,
71    before: Option<serde_json::Value>,
72    after: Option<serde_json::Value>,
73    ctx: &CoolContext,
74) -> AuditEvent {
75    let primary_key = after
76        .as_ref()
77        .or(before.as_ref())
78        .map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
79        .unwrap_or(serde_json::Value::Null);
80    let before = before.map(|mut snapshot| {
81        redact_snapshot(
82            &mut snapshot,
83            descriptor.pii_columns,
84            descriptor.sensitive_columns,
85        );
86        snapshot
87    });
88    let after = after.map(|mut snapshot| {
89        redact_snapshot(
90            &mut snapshot,
91            descriptor.pii_columns,
92            descriptor.sensitive_columns,
93        );
94        snapshot
95    });
96    AuditEvent {
97        event_id: uuid::Uuid::new_v4(),
98        // descriptor.schema_name historically holds the Rust ident;
99        // surfaced as `model`. A schema-wide label will be wired in
100        // when the parser exposes one.
101        schema_name: String::new(),
102        model: descriptor.schema_name.to_owned(),
103        operation,
104        primary_key,
105        actor: actor_from_context(ctx),
106        tenant: ctx.tenant_id().map(|s| s.to_owned()),
107        before,
108        after,
109        request_id: ctx.request_id().map(|s| s.to_owned()),
110        occurred_at: chrono::Utc::now(),
111    }
112}
113
114/// Fetch the current state of a row for audit purposes, taking a
115/// row-level lock so concurrent mutations cannot race us. Bypasses
116/// read policies — audit reflects the actual database state, not the
117/// caller's filtered view.
118pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
119    executor: E,
120    descriptor: &'static ModelDescriptor<M, PK>,
121    id: PK,
122) -> Result<Option<M>, CoolError>
123where
124    E: sqlx::Executor<'e, Database = sqlx::Postgres>,
125    for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
126    PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
127{
128    let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
129    query.push(descriptor.select_projection());
130    query.push(" FROM ").push(descriptor.table_name);
131    query
132        .push(" WHERE ")
133        .push(descriptor.primary_key)
134        .push(" = ");
135    query.push_bind(id);
136    query.push(" FOR UPDATE");
137    query
138        .build_query_as::<M>()
139        .fetch_optional(executor)
140        .await
141        .map_err(|error| CoolError::Database(error.to_string()))
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn audit_operation_string_is_lowercase() {
150        assert_eq!(AuditOperation::Create.as_str(), "create");
151        assert_eq!(AuditOperation::Update.as_str(), "update");
152        assert_eq!(AuditOperation::Delete.as_str(), "delete");
153    }
154}