Skip to main content

cratestack_core/
audit.rs

1//! Audit log primitives.
2//!
3//! The audit subsystem is split into a record format (here,
4//! backend-agnostic) and a sink trait. The canonical store is a table
5//! inside the same database as the mutation, written inside the same
6//! transaction so audit events never drift from the data they describe.
7//! Downstream fan-out (Kafka, Redis pubsub, HTTP webhook) goes through
8//! an [`AuditSink`] implementation; the table itself remains the source
9//! of truth for compliance review.
10
11use std::collections::BTreeMap;
12use std::sync::Arc;
13
14use serde::{Deserialize, Serialize};
15
16use crate::error::CoolError;
17use crate::value::Value;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum AuditOperation {
22    Create,
23    Update,
24    Delete,
25}
26
27impl AuditOperation {
28    pub const fn as_str(&self) -> &'static str {
29        match self {
30            AuditOperation::Create => "create",
31            AuditOperation::Update => "update",
32            AuditOperation::Delete => "delete",
33        }
34    }
35}
36
37#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
38pub struct AuditActor {
39    /// Actor identifier — typically the user id from the auth context.
40    /// Omit when the operation runs without an authenticated principal
41    /// (system jobs, migrations).
42    pub id: Option<String>,
43    /// Free-form claims captured from the auth context at the time of
44    /// the operation. Banks use this for role/scope replay during
45    /// forensics.
46    pub claims: BTreeMap<String, Value>,
47    /// Source IP recorded by the transport layer, if available.
48    pub ip: Option<String>,
49}
50
51#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
52pub struct AuditEvent {
53    pub event_id: uuid::Uuid,
54    /// Schema name as declared in the `.cstack` file — lets you scope
55    /// audit queries to a single service without inspecting model
56    /// strings.
57    pub schema_name: String,
58    /// Model name as declared in the schema (e.g. `Account`, `Transfer`).
59    pub model: String,
60    pub operation: AuditOperation,
61    pub primary_key: serde_json::Value,
62    pub actor: AuditActor,
63    /// Tenant identifier captured from `PrincipalContext.tenant.id`
64    /// when present. Banks running multi-tenant clusters use this to
65    /// scope per-tenant audit exports.
66    pub tenant: Option<String>,
67    pub before: Option<serde_json::Value>,
68    pub after: Option<serde_json::Value>,
69    /// W3C `traceparent`-style request id, if the transport layer
70    /// captured one. Useful for stitching audit rows to APM traces.
71    pub request_id: Option<String>,
72    pub occurred_at: chrono::DateTime<chrono::Utc>,
73}
74
75/// Pluggable audit sink. Implementations fan audit events out to
76/// downstream systems (Kafka topics, Redis pubsub, HTTP webhooks, S3
77/// buckets) for long-term retention or SIEM ingestion. The in-database
78/// audit table written by `cratestack_sqlx` remains the canonical
79/// record; sinks are best-effort projections.
80#[async_trait::async_trait]
81pub trait AuditSink: Send + Sync + 'static {
82    async fn record(&self, event: &AuditEvent) -> Result<(), CoolError>;
83}
84
85/// Default sink that does nothing. The in-database audit table is
86/// treated as authoritative; downstream consumers are added by
87/// wrapping a different sink (or composing several).
88#[derive(Debug, Clone, Default)]
89pub struct NoopAuditSink;
90
91#[async_trait::async_trait]
92impl AuditSink for NoopAuditSink {
93    async fn record(&self, _event: &AuditEvent) -> Result<(), CoolError> {
94        Ok(())
95    }
96}
97
98/// Fan an audit event out to multiple sinks. Errors from any
99/// individual sink are aggregated into [`CoolError::Internal`] so a
100/// single failing downstream does not silently swallow problems with
101/// the others.
102pub struct MulticastAuditSink {
103    sinks: Vec<Arc<dyn AuditSink>>,
104}
105
106impl MulticastAuditSink {
107    pub fn new(sinks: Vec<Arc<dyn AuditSink>>) -> Self {
108        Self { sinks }
109    }
110}
111
112#[async_trait::async_trait]
113impl AuditSink for MulticastAuditSink {
114    async fn record(&self, event: &AuditEvent) -> Result<(), CoolError> {
115        let mut errors = Vec::new();
116        for sink in &self.sinks {
117            if let Err(error) = sink.record(event).await {
118                errors.push(error);
119            }
120        }
121        if errors.is_empty() {
122            Ok(())
123        } else {
124            Err(CoolError::Internal(format!(
125                "{} audit sink(s) failed: {}",
126                errors.len(),
127                errors
128                    .iter()
129                    .map(|e| e.detail().unwrap_or("(no detail)").to_owned())
130                    .collect::<Vec<_>>()
131                    .join("; "),
132            )))
133        }
134    }
135}
136
137/// Transaction isolation level requested by a procedure via
138/// `@isolation(...)`. Mirrors the PostgreSQL spec: lower variants
139/// tolerate more anomalies, higher ones cost more under contention.
140/// Banks running multi-row updates (transfers, postings) typically
141/// pick `Serializable` and pair it with retry-on-serialization-failure.
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
143pub enum TransactionIsolation {
144    ReadCommitted,
145    RepeatableRead,
146    Serializable,
147}
148
149impl TransactionIsolation {
150    pub fn parse(value: &str) -> Result<Self, CoolError> {
151        match value.trim().to_ascii_lowercase().as_str() {
152            "read_committed" | "read committed" => Ok(Self::ReadCommitted),
153            "repeatable_read" | "repeatable read" => Ok(Self::RepeatableRead),
154            "serializable" => Ok(Self::Serializable),
155            other => Err(CoolError::Validation(format!(
156                "unknown transaction isolation level '{other}'; expected one of \
157                 'read_committed', 'repeatable_read', 'serializable'",
158            ))),
159        }
160    }
161
162    pub const fn as_sql(&self) -> &'static str {
163        match self {
164            Self::ReadCommitted => "READ COMMITTED",
165            Self::RepeatableRead => "REPEATABLE READ",
166            Self::Serializable => "SERIALIZABLE",
167        }
168    }
169}