Skip to main content

systemprompt_security/authz/audit/
repository.rs

1//! `governance_decisions` insert primitive.
2//!
3//! Single canonical writer for the table. Both the extension's
4//! `POST /govern/authz` handler (for resolved decisions) and core's
5//! [`DbAuditSink`](super::DbAuditSink) (for webhook-fault, default-deny, and
6//! unrestricted-allow decisions) call this repository so there is exactly one
7//! SQL statement that knows the column layout.
8
9use sqlx::PgPool;
10use systemprompt_identifiers::Actor;
11
12use crate::authz::types::DecisionTag;
13use crate::policy::types::AccessScope;
14
15/// Prometheus counter incremented whenever a `governance_decisions` INSERT
16/// fails.
17///
18/// Exposed as a `pub const` so alert rules and dashboards can reference the
19/// metric by symbol rather than re-typing the literal.
20pub const AUDIT_WRITE_FAILED_TOTAL: &str = "governance_audit_write_failed_total";
21
22#[derive(Debug)]
23pub struct GovernanceDecisionRecord<'a> {
24    pub id: &'a str,
25    pub actor: &'a Actor,
26    pub session_id: &'a str,
27    pub tool_name: &'a str,
28    pub agent_id: Option<&'a str>,
29    pub agent_scope: Option<AccessScope>,
30    pub decision: DecisionTag,
31    pub policy: &'a str,
32    pub reason: &'a str,
33    // JSON: governance audit blob — typed `DecisionAudit` on the writing side;
34    // payload shape is documented in CHANGELOG and rendered by the dashboard.
35    pub evaluated_rules: &'a serde_json::Value,
36    pub plugin_id: Option<&'a str>,
37    /// RFC 8693 delegation lineage in outermost-first order. Empty for
38    /// direct (non-delegated) tokens.
39    pub act_chain: &'a [Actor],
40}
41
42#[derive(Debug, Clone)]
43pub struct GovernanceDecisionRepository {
44    pool: std::sync::Arc<PgPool>,
45}
46
47impl GovernanceDecisionRepository {
48    pub const fn from_pool(pool: std::sync::Arc<PgPool>) -> Self {
49        Self { pool }
50    }
51
52    pub fn pool(&self) -> &PgPool {
53        &self.pool
54    }
55
56    pub async fn insert(&self, record: &GovernanceDecisionRecord<'_>) -> Result<(), sqlx::Error> {
57        insert_governance_decision(&self.pool, record).await
58    }
59}
60
61pub async fn insert_governance_decision(
62    pool: &PgPool,
63    record: &GovernanceDecisionRecord<'_>,
64) -> Result<(), sqlx::Error> {
65    let actor_kind = record.actor.kind.tag();
66    let actor_id = record.actor.kind.actor_id(&record.actor.user_id);
67    // Why: act_chain is `Vec<Actor>` which is unconditionally serde-compliant,
68    // so serialization failure is unreachable; falling back to `[]` keeps the
69    // audit row writable rather than dropping the entire governance record.
70    let act_chain =
71        serde_json::to_value(record.act_chain).unwrap_or_else(|_| serde_json::json!([]));
72    let result = sqlx::query!(
73        "INSERT INTO governance_decisions (id, user_id, session_id, tool_name, agent_id, \
74         agent_scope, decision, policy, reason, evaluated_rules, plugin_id, actor_kind, actor_id, \
75         act_chain) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
76        record.id,
77        record.actor.user_id.as_str(),
78        record.session_id,
79        record.tool_name,
80        record.agent_id,
81        record.agent_scope.map(AccessScope::as_str),
82        record.decision.as_str(),
83        record.policy,
84        record.reason,
85        record.evaluated_rules,
86        record.plugin_id,
87        actor_kind.as_str(),
88        actor_id,
89        act_chain,
90    )
91    .execute(pool)
92    .await;
93    if let Err(error) = &result {
94        // Why: callers run this inside `tokio::spawn` (fire-and-forget audit
95        // writes), so a swallowed error here is invisible to the HTTP
96        // response. Log + counter at the SQL boundary guarantees every drop
97        // surfaces regardless of caller.
98        tracing::error!(
99            error = %error,
100            actor_kind = actor_kind.as_str(),
101            actor_id,
102            policy = record.policy,
103            decision = record.decision.as_str(),
104            session_id = record.session_id,
105            "governance_decisions insert failed; audit row dropped"
106        );
107        metrics::counter!(
108            AUDIT_WRITE_FAILED_TOTAL,
109            "actor_kind" => actor_kind.as_str(),
110            "decision" => record.decision.as_str(),
111            "policy" => record.policy.to_owned(),
112        )
113        .increment(1);
114    }
115    result.map(|_| ())
116}