1mod redact;
8mod schema;
9
10use cratestack_core::{AuditActor, AuditEvent, AuditOperation, CoolContext, CoolError};
11
12use crate::ModelDescriptor;
13use crate::sqlx;
14
15pub use redact::{primary_key_from_snapshot, redact_snapshot, snapshot_model};
16pub use schema::AUDIT_TABLE_DDL;
17pub(crate) use schema::ensure_audit_table;
18
19pub(crate) async fn enqueue_audit_event<'e, E>(
22 executor: E,
23 event: &AuditEvent,
24) -> Result<(), CoolError>
25where
26 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
27{
28 let actor = serde_json::to_value(&event.actor)
29 .map_err(|error| CoolError::Codec(format!("encode audit actor: {error}")))?;
30 sqlx::query(
31 "INSERT INTO cratestack_audit (\
32 event_id, schema_name, model, operation, primary_key, actor, \
33 tenant, before, after, request_id, occurred_at\
34 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
35 )
36 .bind(event.event_id)
37 .bind(&event.schema_name)
38 .bind(&event.model)
39 .bind(event.operation.as_str())
40 .bind(&event.primary_key)
41 .bind(actor)
42 .bind(event.tenant.as_deref())
43 .bind(event.before.as_ref())
44 .bind(event.after.as_ref())
45 .bind(event.request_id.as_deref())
46 .bind(event.occurred_at)
47 .execute(executor)
48 .await
49 .map(|_| ())
50 .map_err(|error| CoolError::Database(error.to_string()))
51}
52
53pub(crate) fn actor_from_context(ctx: &CoolContext) -> AuditActor {
57 AuditActor {
58 id: ctx.principal_actor_id().map(|s| s.to_owned()),
59 claims: ctx.audit_claims_snapshot(),
60 ip: ctx.client_ip().map(|s| s.to_owned()),
61 }
62}
63
64pub(crate) fn build_audit_event<M, PK>(
69 descriptor: &'static ModelDescriptor<M, PK>,
70 operation: AuditOperation,
71 before: Option<serde_json::Value>,
72 after: Option<serde_json::Value>,
73 ctx: &CoolContext,
74) -> AuditEvent {
75 let primary_key = after
76 .as_ref()
77 .or(before.as_ref())
78 .map(|snapshot| primary_key_from_snapshot(snapshot, descriptor.primary_key))
79 .unwrap_or(serde_json::Value::Null);
80 let before = before.map(|mut snapshot| {
81 redact_snapshot(
82 &mut snapshot,
83 descriptor.pii_columns,
84 descriptor.sensitive_columns,
85 );
86 snapshot
87 });
88 let after = after.map(|mut snapshot| {
89 redact_snapshot(
90 &mut snapshot,
91 descriptor.pii_columns,
92 descriptor.sensitive_columns,
93 );
94 snapshot
95 });
96 AuditEvent {
97 event_id: uuid::Uuid::new_v4(),
98 schema_name: String::new(),
102 model: descriptor.schema_name.to_owned(),
103 operation,
104 primary_key,
105 actor: actor_from_context(ctx),
106 tenant: ctx.tenant_id().map(|s| s.to_owned()),
107 before,
108 after,
109 request_id: ctx.request_id().map(|s| s.to_owned()),
110 occurred_at: chrono::Utc::now(),
111 }
112}
113
114pub(crate) async fn fetch_for_audit<'e, E, M, PK>(
119 executor: E,
120 descriptor: &'static ModelDescriptor<M, PK>,
121 id: PK,
122) -> Result<Option<M>, CoolError>
123where
124 E: sqlx::Executor<'e, Database = sqlx::Postgres>,
125 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow>,
126 PK: Send + sqlx::Type<sqlx::Postgres> + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
127{
128 let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
129 query.push(descriptor.select_projection());
130 query.push(" FROM ").push(descriptor.table_name);
131 query
132 .push(" WHERE ")
133 .push(descriptor.primary_key)
134 .push(" = ");
135 query.push_bind(id);
136 query.push(" FOR UPDATE");
137 query
138 .build_query_as::<M>()
139 .fetch_optional(executor)
140 .await
141 .map_err(|error| CoolError::Database(error.to_string()))
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn audit_operation_string_is_lowercase() {
150 assert_eq!(AuditOperation::Create.as_str(), "create");
151 assert_eq!(AuditOperation::Update.as_str(), "update");
152 assert_eq!(AuditOperation::Delete.as_str(), "delete");
153 }
154}