Skip to main content

ai_memory/governance/
deferred_audit.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 Policy-Engine Item 3 — deferred audit-log queue for the
5//! storage governance pre-write hook.
6//!
7//! # The gap this closes
8//!
9//! The substrate's `GOVERNANCE_PRE_WRITE` hook (installed in
10//! `daemon_runtime::bootstrap_serve`, consulted from every
11//! `storage::insert*` call) ran
12//! [`super::agent_action::check_agent_action_no_audit`] — the
13//! `_no_audit` suffix is there because emitting a `signed_events` row
14//! from INSIDE the in-flight INSERT transaction would re-enter the
15//! same `Connection` and deadlock under the substrate's
16//! `Arc<Mutex<Connection>>` lock.
17//!
18//! Consequence: **storage refusals were typed-but-not-cryptographically-logged**.
19//! Other paths (the audited [`super::agent_action::check_agent_action`]
20//! variant) DO chain-log via `signed_events`. That asymmetry was the
21//! known gap in the bypass-impossibility audit story.
22//!
23//! # The fix — deferred chain-log
24//!
25//! This module ships a process-wide `DeferredAuditQueue`:
26//!
27//! 1. The storage hook captures the refusal verdict + agent identity
28//!    + canonical action payload as a [`DeferredAuditEvent`] and
29//!    submits it to the queue via [`DeferredAuditQueue::submit`]
30//!    (non-blocking, never panics).
31//! 2. A background drainer task ([`spawn_drainer_task`]) owns a FRESH
32//!    `Connection` (opened against the same `db_path` but NOT the
33//!    substrate writer's connection — SQLite WAL allows parallel
34//!    readers and the drainer's writes don't contend with the
35//!    in-flight `storage::insert` transaction because it has
36//!    already released its lock by the time the drainer runs).
37//! 3. For every received event, the drainer appends a
38//!    `governance.refusal` row to `signed_events`. The chain-log
39//!    property closes.
40//!
41//! # Supervisor pattern
42//!
43//! The drainer task is wrapped by [`spawn_supervised_drainer`] which
44//! restarts the inner task on panic. A panic in the drainer would
45//! otherwise silently drop the audit chain — a regression worse than
46//! the original gap. The supervisor uses `tokio::task::spawn` with
47//! `JoinHandle` polling so cleanup on shutdown is deterministic.
48//!
49//! # Backpressure / lossiness
50//!
51//! The channel is `tokio::sync::mpsc::unbounded_channel` by design:
52//!
53//! - Refusals are rare (a properly-configured fleet refuses
54//!   << 1% of writes).
55//! - A bounded channel would silently drop on full — and a silent
56//!   audit drop IS a security regression we cannot accept.
57//! - Memory pressure under attack is bounded by the rate at which the
58//!   drainer can append `signed_events` rows; on macOS / Linux a
59//!   single SQLite append in WAL mode is ~25-100 microseconds, so a
60//!   sustained 100k refusals/second saturates one core but never
61//!   blocks the storage write path.
62//!
63//! # Graceful shutdown
64//!
65//! [`DeferredAuditQueue::close_and_flush`] drops the sender and
66//! awaits the supervisor task to terminate. The drainer drains every
67//! still-buffered event before exiting; pending events MUST land in
68//! `signed_events` before the daemon's tokio runtime is torn down,
69//! or the chain-log property is broken.
70
71use std::path::{Path, PathBuf};
72use std::sync::Arc;
73use std::sync::atomic::{AtomicU64, Ordering};
74
75use anyhow::{Context, Result};
76use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
77use tokio::task::JoinHandle;
78
79use crate::governance::agent_action::{AgentAction, Decision};
80use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
81
82/// Cluster-C SEC-3 (issue #767) — maximum number of times a sink will
83/// retry a single audit append after a `SQLITE_CONSTRAINT_UNIQUE` race
84/// (concurrent writer beat us to the same `sequence` value on the
85/// `idx_signed_events_sequence` UNIQUE index). The BEGIN IMMEDIATE wrap
86/// in `append_signed_event` should make true contention rare; this
87/// retry budget is the safety belt. After exhausting it the event
88/// lands in `signed_events_dlq` so the audit drop is never silent.
89const APPEND_UNIQUE_RACE_MAX_RETRIES: usize = 5;
90
91/// Wire-name for the deferred refusal audit row. Audit-side dashboards
92/// filter on this string to surface storage-hook refusals separate
93/// from the existing `governance.check` rows produced by the audited
94/// `check_agent_action` path.
95pub const GOVERNANCE_REFUSAL_EVENT_TYPE: &str = "governance.refusal";
96
97/// One refusal captured by the storage hook, awaiting flush to the
98/// `signed_events` chain.
99///
100/// All fields are owned (no borrows) so the event can cross the mpsc
101/// channel boundary without lifetime gymnastics. `payload_bytes` is
102/// the canonical-JSON encoding of `{action, decision}` — the same
103/// shape the audited path commits to via
104/// `agent_action::emit_check_event`. The drainer hashes this on the
105/// way to `signed_events.payload_hash`.
106#[derive(Debug, Clone)]
107pub struct DeferredAuditEvent {
108    /// Agent identity at the moment of refusal (resolved from request
109    /// or process context). Lands in `signed_events.agent_id`.
110    pub agent_id: String,
111    /// The action that was refused. Cloned from the hook input.
112    pub action: AgentAction,
113    /// The verdict — must be a `Refuse` variant; non-refusal events
114    /// do not enter the queue (the submit helpers gate on
115    /// `Decision::is_refusal`).
116    pub decision: Decision,
117    /// Wall-clock timestamp of refusal. Lands in
118    /// `signed_events.timestamp` as RFC3339.
119    pub timestamp: chrono::DateTime<chrono::Utc>,
120}
121
122impl DeferredAuditEvent {
123    /// Build a deferred event from the hook's three inputs. Returns
124    /// `None` when `decision` is not a refusal — callers should
125    /// only submit refusals to the queue (Allow / Warn paths do not
126    /// chain-log a refusal row).
127    #[must_use]
128    pub fn from_refusal(agent_id: &str, action: &AgentAction, decision: &Decision) -> Option<Self> {
129        if !decision.is_refusal() {
130            return None;
131        }
132        Some(Self {
133            agent_id: agent_id.to_string(),
134            action: action.clone(),
135            decision: decision.clone(),
136            timestamp: chrono::Utc::now(),
137        })
138    }
139
140    /// Extract the rule_id from the refusal verdict. Used by the
141    /// drainer to surface the firing rule in the audit row's
142    /// canonical payload.
143    #[must_use]
144    pub fn rule_id(&self) -> Option<&str> {
145        match &self.decision {
146            Decision::Refuse { rule_id, .. } => Some(rule_id.as_str()),
147            _ => None,
148        }
149    }
150
151    /// Extract the refusal reason from the verdict (verbatim
152    /// operator-authored string).
153    #[must_use]
154    pub fn reason(&self) -> Option<&str> {
155        match &self.decision {
156            Decision::Refuse { reason, .. } => Some(reason.as_str()),
157            _ => None,
158        }
159    }
160
161    /// Canonical JSON shape the drainer hashes for
162    /// `signed_events.payload_hash`. Stable across versions: a
163    /// flat object with `action`, `decision`, `agent_id`,
164    /// `timestamp` keys — same outline as
165    /// `agent_action::emit_check_event` plus the agent + timestamp.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error only if `serde_json` cannot serialize the
170    /// action variant (in practice never happens for the canonical
171    /// AgentAction shapes).
172    pub fn canonical_bytes(&self) -> Result<Vec<u8>> {
173        let canonical = serde_json::json!({
174            "action": self.action,
175            "decision": self.decision,
176            "agent_id": self.agent_id,
177            "timestamp": self.timestamp.to_rfc3339(),
178        });
179        serde_json::to_vec(&canonical).context("DeferredAuditEvent::canonical_bytes")
180    }
181}
182
183/// Shared counters surfaced for observability. Cloning is cheap
184/// (just `Arc` bumps); the public read path is on the queue handle.
185#[derive(Debug, Clone, Default)]
186pub struct DeferredAuditMetrics {
187    /// Number of events submitted into the queue. Includes events
188    /// that were later dropped because the receiver was already
189    /// closed.
190    pub submitted: Arc<AtomicU64>,
191    /// Number of events the drainer successfully appended to
192    /// `signed_events`.
193    pub appended: Arc<AtomicU64>,
194    /// Number of submit attempts that failed because the receiver
195    /// was already closed (drainer dropped / shutdown raced).
196    pub send_failures: Arc<AtomicU64>,
197    /// Number of drainer iterations that failed the SQLite append.
198    /// A non-zero value indicates DB pressure / corruption; the
199    /// supervisor surfaces these in tracing::error logs.
200    pub append_failures: Arc<AtomicU64>,
201    /// Number of times the supervisor restarted the drainer after a
202    /// panic. Should be zero in healthy operation.
203    pub drainer_panics: Arc<AtomicU64>,
204    /// Cluster-C SEC-3 (issue #767) — number of events that landed in
205    /// the `signed_events_dlq` table after exhausting the
206    /// `SQLITE_CONSTRAINT_UNIQUE` retry budget or hitting an
207    /// unrecoverable non-race error. Surfaced via the capabilities-v3
208    /// envelope's `approval.deferred_audit_dlq_size` field (live count
209    /// query) and via this counter (cumulative since process boot).
210    pub dlq_landed: Arc<AtomicU64>,
211    /// Cluster-C SEC-3 (issue #767) — cumulative number of
212    /// `SQLITE_CONSTRAINT_UNIQUE` race retries observed by the
213    /// production sink. Every retry indicates the chain-head read
214    /// raced a sibling-connection writer; a sustained non-zero value
215    /// hints at write-contention pressure on the audit chain (e.g. a
216    /// burst of refusals while the substrate writer is also churning
217    /// out `memory_link.created` rows).
218    pub unique_race_retries: Arc<AtomicU64>,
219}
220
221impl DeferredAuditMetrics {
222    /// Number of events submitted since process boot.
223    #[must_use]
224    pub fn submitted_count(&self) -> u64 {
225        self.submitted.load(Ordering::Relaxed)
226    }
227
228    /// Number of events successfully chain-logged since process boot.
229    #[must_use]
230    pub fn appended_count(&self) -> u64 {
231        self.appended.load(Ordering::Relaxed)
232    }
233
234    /// Number of submit failures (receiver dropped).
235    #[must_use]
236    pub fn send_failure_count(&self) -> u64 {
237        self.send_failures.load(Ordering::Relaxed)
238    }
239
240    /// Number of append failures (SQLite error).
241    #[must_use]
242    pub fn append_failure_count(&self) -> u64 {
243        self.append_failures.load(Ordering::Relaxed)
244    }
245
246    /// Number of supervisor-observed drainer panics.
247    #[must_use]
248    pub fn panic_count(&self) -> u64 {
249        self.drainer_panics.load(Ordering::Relaxed)
250    }
251
252    /// Cluster-C SEC-3 — cumulative number of events that landed in
253    /// `signed_events_dlq` since process boot.
254    #[must_use]
255    pub fn dlq_landed_count(&self) -> u64 {
256        self.dlq_landed.load(Ordering::Relaxed)
257    }
258
259    /// Cluster-C SEC-3 — cumulative number of UNIQUE-constraint race
260    /// retries observed by the production sink since process boot.
261    #[must_use]
262    pub fn unique_race_retry_count(&self) -> u64 {
263        self.unique_race_retries.load(Ordering::Relaxed)
264    }
265}
266
267/// Producer-side handle. Cloneable so multiple callsites (HTTP
268/// handler, MCP handler, internal substrate writer) all share one
269/// queue.
270#[derive(Clone)]
271pub struct DeferredAuditQueue {
272    sender: UnboundedSender<DeferredAuditEvent>,
273    metrics: DeferredAuditMetrics,
274}
275
276impl DeferredAuditQueue {
277    /// Create a fresh queue + uninstalled receiver. The receiver
278    /// MUST be passed to [`spawn_drainer_task`] (or
279    /// [`spawn_supervised_drainer`]) for events to land — submits
280    /// against an unspawned receiver accumulate in the channel
281    /// buffer indefinitely until the receiver is consumed or
282    /// dropped.
283    #[must_use]
284    pub fn new() -> (Self, UnboundedReceiver<DeferredAuditEvent>) {
285        let (sender, receiver) = mpsc::unbounded_channel();
286        let queue = Self {
287            sender,
288            metrics: DeferredAuditMetrics::default(),
289        };
290        (queue, receiver)
291    }
292
293    /// Submit a refusal event. Non-blocking. Never panics — if the
294    /// receiver is closed the metric counter is bumped and a
295    /// tracing::warn is emitted, but the caller path is unaffected.
296    /// Returns `true` when the event was queued, `false` when the
297    /// receiver was already closed.
298    pub fn submit(&self, event: DeferredAuditEvent) -> bool {
299        self.metrics.submitted.fetch_add(1, Ordering::Relaxed);
300        match self.sender.send(event) {
301            Ok(()) => true,
302            Err(_) => {
303                self.metrics.send_failures.fetch_add(1, Ordering::Relaxed);
304                tracing::warn!(
305                    "deferred_audit_queue: submit failed (drainer receiver closed); \
306                     audit chain row LOST for this refusal"
307                );
308                false
309            }
310        }
311    }
312
313    /// Convenience: build + submit a refusal from the three hook
314    /// inputs. Returns `true` when an event was actually enqueued
315    /// (i.e. the verdict was a refusal AND the receiver was open).
316    pub fn submit_refusal(
317        &self,
318        agent_id: &str,
319        action: &AgentAction,
320        decision: &Decision,
321    ) -> bool {
322        let Some(event) = DeferredAuditEvent::from_refusal(agent_id, action, decision) else {
323            return false;
324        };
325        self.submit(event)
326    }
327
328    /// Observability handle. Clone-cheap; safe to expose to readers
329    /// (Prometheus scrape, MCP `governance_state` tool, etc.).
330    #[must_use]
331    pub fn metrics(&self) -> DeferredAuditMetrics {
332        self.metrics.clone()
333    }
334
335    /// True when the drainer receiver is still attached. False when
336    /// the supervisor task and its receiver have both terminated
337    /// (shutdown complete).
338    #[must_use]
339    pub fn is_open(&self) -> bool {
340        !self.sender.is_closed()
341    }
342}
343
344// ---------------------------------------------------------------------------
345// Drainer / supervisor
346// ---------------------------------------------------------------------------
347
348/// Outcome of one drainer-side append attempt.
349///
350/// Cluster-C SEC-3 (issue #767) — splits a sink's per-event verdict
351/// into three buckets so the drainer's metrics + DLQ accounting line
352/// up with the operator-facing semantics:
353///
354/// * [`AppendOutcome::Appended`] — row landed in `signed_events`,
355///   chain advanced one step. Increments `appended`.
356/// * [`AppendOutcome::DlqLanded`] — append exhausted its race-retry
357///   budget or hit an unrecoverable non-race error; the sink wrote
358///   the event into `signed_events_dlq` with the failure reason
359///   captured. Increments `dlq_landed`. The audit chain itself does
360///   NOT advance, but the row is recoverable post-mortem.
361/// * Returning `Err(_)` from `append` means the sink could not even
362///   land in the DLQ (e.g. DB file gone, schema mismatch). Increments
363///   `append_failures` — the chain-log property has truly broken for
364///   this event and an operator must intervene.
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366pub enum AppendOutcome {
367    /// Event landed in `signed_events`.
368    Appended,
369    /// Event landed in `signed_events_dlq` after exhausting retries
370    /// or hitting an unrecoverable non-race error.
371    DlqLanded,
372}
373
374/// Sink trait: an abstraction over "where the drainer writes the
375/// audit row". The production wiring opens a fresh SQLite
376/// `Connection` per drainer task and writes through `signed_events`
377/// with DLQ fallback; tests substitute a mock sink to assert
378/// per-event behavior or inject panics for supervisor-recovery
379/// coverage.
380///
381/// `append` MUST be `&mut` so a test sink can record events into an
382/// owned `Vec` without interior mutability. Production sinks
383/// (SQLite-backed) hold their state behind the impl.
384pub trait DeferredAuditSink: Send + 'static {
385    /// Persist one event.
386    ///
387    /// # Errors
388    ///
389    /// Returns `Err` only when the sink cannot even land the event in
390    /// its DLQ surface — a genuinely unrecoverable case (DB file gone,
391    /// schema mismatch). Soft failures (race retries, unrecoverable
392    /// `signed_events` INSERT followed by successful DLQ land) are
393    /// reported via [`AppendOutcome::DlqLanded`].
394    fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome>;
395}
396
397/// Production sink: opens a fresh SQLite `Connection` against the
398/// daemon's database path and appends a `governance.refusal` row to
399/// `signed_events` for each event.
400///
401/// One `Connection` per drainer task — NOT shared with the substrate
402/// writer. SQLite WAL mode lets the drainer's appends proceed in
403/// parallel with the writer's INSERTs without lock contention.
404///
405/// # Cluster-C SEC-3 (issue #767) — race-retry + DLQ
406///
407/// The sink wraps `append_signed_event` in a bounded retry loop. A
408/// `SQLITE_CONSTRAINT_UNIQUE` failure on the `idx_signed_events_sequence`
409/// UNIQUE index is the recoverable race-only signal (two writers
410/// computed the same `next_seq` simultaneously); the sink re-runs
411/// `append_signed_event` (which re-reads the chain head) up to
412/// [`APPEND_UNIQUE_RACE_MAX_RETRIES`] times. Any other rusqlite
413/// error, or a retry budget exhaustion, lands the event in
414/// `signed_events_dlq` and returns `Ok(AppendOutcome::DlqLanded)` so
415/// the drainer can update its counters without crashing.
416pub struct SqliteSignedEventsSink {
417    db_path: PathBuf,
418    conn: Option<rusqlite::Connection>,
419    metrics: Option<DeferredAuditMetrics>,
420}
421
422impl SqliteSignedEventsSink {
423    /// Construct without opening — the connection is opened lazily
424    /// on first `append`. This pattern lets the supervisor restart
425    /// the sink across drainer respawns without holding a closed
426    /// `Connection` handle.
427    ///
428    /// Built without a metrics handle; race-retry + DLQ-land events
429    /// will not increment any external counters. Use
430    /// [`Self::with_metrics`] (or [`spawn_supervised_drainer`] which
431    /// threads the metrics handle automatically) for full
432    /// observability.
433    #[must_use]
434    pub fn new(db_path: impl Into<PathBuf>) -> Self {
435        Self {
436            db_path: db_path.into(),
437            conn: None,
438            metrics: None,
439        }
440    }
441
442    /// Construct a sink wired to a metrics handle. The handle's
443    /// `dlq_landed` and `unique_race_retries` counters are bumped as
444    /// the sink observes those outcomes.
445    #[must_use]
446    pub fn with_metrics(db_path: impl Into<PathBuf>, metrics: DeferredAuditMetrics) -> Self {
447        Self {
448            db_path: db_path.into(),
449            conn: None,
450            metrics: Some(metrics),
451        }
452    }
453
454    fn ensure_conn(&mut self) -> Result<&rusqlite::Connection> {
455        if self.conn.is_none() {
456            let conn = crate::db::open(&self.db_path).with_context(|| {
457                format!(
458                    "SqliteSignedEventsSink: open {} for deferred-audit drainer",
459                    self.db_path.display()
460                )
461            })?;
462            self.conn = Some(conn);
463        }
464        // We just inserted Some — unwrap is safe.
465        Ok(self.conn.as_ref().expect("conn populated above"))
466    }
467
468    fn bump_dlq(&self) {
469        if let Some(m) = &self.metrics {
470            m.dlq_landed.fetch_add(1, Ordering::Relaxed);
471        }
472    }
473
474    fn bump_race_retry(&self) {
475        if let Some(m) = &self.metrics {
476            m.unique_race_retries.fetch_add(1, Ordering::Relaxed);
477        }
478    }
479}
480
481impl DeferredAuditSink for SqliteSignedEventsSink {
482    fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
483        let bytes = event
484            .canonical_bytes()
485            .context("SqliteSignedEventsSink: canonical_bytes")?;
486        let hash = payload_hash(&bytes);
487        // v0.7.0 #1035 — sign the payload_hash with the daemon's
488        // process-wide audit key when installed. The forensic JSONL
489        // sink and this SQL sink share the same key (installed by
490        // `audit::init`); a downstream auditor with the daemon's
491        // verifying key validates both chains uniformly.
492        let (signature, attest_level) =
493            match crate::governance::audit::try_sign_audit_payload(&hash) {
494                Some((sig, level)) => (Some(sig), level.to_string()),
495                None => (
496                    None,
497                    crate::models::AttestLevel::Unsigned.as_str().to_string(),
498                ),
499            };
500        let signed = SignedEvent {
501            id: uuid::Uuid::new_v4().to_string(),
502            agent_id: event.agent_id.clone(),
503            event_type: GOVERNANCE_REFUSAL_EVENT_TYPE.to_string(),
504            payload_hash: hash,
505            signature,
506            attest_level,
507            timestamp: event.timestamp.to_rfc3339(),
508            ..SignedEvent::default()
509        };
510
511        // Race-retry loop: SQLITE_CONSTRAINT_UNIQUE on the
512        // `idx_signed_events_sequence` index is the race-only failure
513        // mode. Every other rusqlite error path bails to DLQ.
514        let conn_path = self.db_path.clone();
515        let mut last_err: Option<anyhow::Error> = None;
516        for attempt in 0..=APPEND_UNIQUE_RACE_MAX_RETRIES {
517            let conn = self.ensure_conn()?;
518            match append_signed_event(conn, &signed) {
519                Ok(()) => return Ok(AppendOutcome::Appended),
520                Err(e) => {
521                    if is_unique_constraint_race(&e) && attempt < APPEND_UNIQUE_RACE_MAX_RETRIES {
522                        self.bump_race_retry();
523                        tracing::warn!(
524                            attempt = attempt + 1,
525                            db = %conn_path.display(),
526                            "deferred_audit sink: SQLITE_CONSTRAINT_UNIQUE on signed_events.sequence — \
527                             chain-head race; retrying (budget {APPEND_UNIQUE_RACE_MAX_RETRIES})"
528                        );
529                        last_err = Some(e);
530                        continue;
531                    }
532                    last_err = Some(e);
533                    break;
534                }
535            }
536        }
537
538        // Either the retry budget was exhausted on UNIQUE races OR
539        // the first attempt produced a non-race error. Land in DLQ.
540        //
541        // v0.7.0 #1046 (Agent-6 #7) — chain-log property advisory.
542        // At v0.7.0 the audit-chain delivery contract is:
543        //
544        //   **chain-log emission is "exactly-once OR DLQ-recoverable"**
545        //
546        // Specifically:
547        //   - On the happy path, every refusal lands exactly one
548        //     `governance.refusal` row in `signed_events` (the
549        //     append-only chain advances by one).
550        //   - On the DLQ-landing path (this branch), the refusal
551        //     row does NOT advance the chain — instead a row lands
552        //     in `signed_events_dlq` with the same `id`,
553        //     `agent_id`, `payload_hash`, and `failure_reason`.
554        //     The DLQ row preserves enough state to replay back
555        //     into the chain.
556        //   - There is NO boot-time DLQ→chain replay path at
557        //     v0.7.0. Operators with non-zero `dlq_landed_count`
558        //     should run the operator-side replay tooling (or
559        //     manual SQL: copy DLQ rows into `signed_events` after
560        //     resolving the chain-head race condition that caused
561        //     the DLQ landing).
562        //
563        // A future v0.8 change can wire a boot-time DLQ replay
564        // sweep that retries every `signed_events_dlq` entry into
565        // `signed_events` via `append_signed_event`. The current
566        // chain-log property is the load-bearing contract: the
567        // cryptographic chain never carries a "phantom" refusal
568        // (every chain row is a real append), and DLQ rows are
569        // recoverable via operator action.
570        let err = last_err.unwrap_or_else(|| anyhow::anyhow!("unknown drainer sink error"));
571        let failure_reason = format!("{err:#}");
572        let conn = self.ensure_conn()?;
573        conn.execute(
574            "INSERT INTO signed_events_dlq \
575             (id, agent_id, event_type, payload_hash, signature, attest_level, \
576              timestamp, failure_reason, failed_at) \
577             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
578            rusqlite::params![
579                signed.id,
580                signed.agent_id,
581                signed.event_type,
582                signed.payload_hash,
583                signed.signature,
584                signed.attest_level,
585                signed.timestamp,
586                failure_reason,
587                chrono::Utc::now().to_rfc3339(),
588            ],
589        )
590        .context("SqliteSignedEventsSink: DLQ insert")?;
591        tracing::error!(
592            failure_reason = %failure_reason,
593            agent_id = %signed.agent_id,
594            event_id = %signed.id,
595            "deferred_audit sink: append exhausted retries or hit non-race error — \
596             event landed in signed_events_dlq (chain row NOT advanced; replay needed)"
597        );
598        self.bump_dlq();
599        Ok(AppendOutcome::DlqLanded)
600    }
601}
602
603/// Cluster-C SEC-3 (issue #767) — classify an `anyhow::Error` produced
604/// by `append_signed_event` as a recoverable `SQLITE_CONSTRAINT_UNIQUE`
605/// chain-head race vs everything else.
606///
607/// The classification walks the chain of causes and matches on
608/// `rusqlite::Error::SqliteFailure` with extended code
609/// `SQLITE_CONSTRAINT_UNIQUE` (2067). We deliberately use the typed
610/// matcher rather than string-matching the rendered error so a future
611/// rusqlite version that tweaks the `Display` impl doesn't silently
612/// flip every race into a DLQ-land.
613fn is_unique_constraint_race(err: &anyhow::Error) -> bool {
614    for cause in err.chain() {
615        if let Some(rusqlite::Error::SqliteFailure(code, _)) =
616            cause.downcast_ref::<rusqlite::Error>()
617        {
618            if code.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE {
619                return true;
620            }
621        }
622    }
623    false
624}
625
626/// Cluster-C SEC-3 (issue #767) — live count of rows in
627/// `signed_events_dlq`.
628///
629/// Surfaced via the capabilities-v3 envelope's
630/// `approval.deferred_audit_dlq_size` field so operator dashboards
631/// see the current DLQ depth without scraping logs. Returns `0` on
632/// query failure (a missing table or transient lock); callers that
633/// want hard-fail behavior should query the table directly.
634///
635/// # Errors
636///
637/// Returns the underlying `rusqlite` error if the SELECT fails for a
638/// reason other than `SQLITE_NOMEM` (which we treat as transient).
639/// The capabilities pathway treats this as best-effort and falls
640/// through to 0 on error.
641pub fn dlq_size(conn: &rusqlite::Connection) -> Result<u64> {
642    let count: i64 = conn
643        .query_row("SELECT COUNT(*) FROM signed_events_dlq", [], |r| r.get(0))
644        .context("dlq_size: SELECT COUNT")?;
645    Ok(u64::try_from(count).unwrap_or(0))
646}
647
648/// Spawn a single drainer iteration. The returned `JoinHandle`
649/// completes (Ok) when the channel sender is dropped AND the
650/// receiver has been fully drained — graceful shutdown. A panic in
651/// the sink propagates through the `JoinHandle` (the
652/// [`spawn_supervised_drainer`] wrapper catches it and respawns).
653///
654/// Use [`spawn_supervised_drainer`] in production. This bare entry
655/// point is exposed for tests that want one-shot drainer behavior
656/// without supervisor restart.
657#[must_use]
658pub fn spawn_drainer_task<S: DeferredAuditSink + 'static>(
659    mut receiver: UnboundedReceiver<DeferredAuditEvent>,
660    mut sink: S,
661    metrics: DeferredAuditMetrics,
662) -> JoinHandle<UnboundedReceiver<DeferredAuditEvent>> {
663    tokio::spawn(async move {
664        while let Some(event) = receiver.recv().await {
665            match sink.append(&event) {
666                Ok(AppendOutcome::Appended) => {
667                    metrics.appended.fetch_add(1, Ordering::Relaxed);
668                }
669                Ok(AppendOutcome::DlqLanded) => {
670                    // Cluster-C SEC-3: the sink already captured the
671                    // event in `signed_events_dlq` and bumped its
672                    // own dlq_landed metric (if wired). We also bump
673                    // `append_failures` so existing dashboards that
674                    // alert on a non-zero append_failures count
675                    // still surface DLQ landings — operators read
676                    // the SAME signal regardless of which bucket
677                    // the row went into.
678                    metrics.append_failures.fetch_add(1, Ordering::Relaxed);
679                    tracing::warn!(
680                        "deferred_audit drainer: event landed in DLQ \
681                         (audit chain row NOT advanced; operator replay needed)"
682                    );
683                }
684                Err(e) => {
685                    metrics.append_failures.fetch_add(1, Ordering::Relaxed);
686                    tracing::error!(
687                        "deferred_audit drainer: sink.append failed (no DLQ landing either): {:#}",
688                        e
689                    );
690                    // We don't requeue — the channel is single-consumer.
691                    // The supervisor's panic-restart path is for sink
692                    // PANICS (poisoned state); soft errors here are
693                    // recorded and the loop continues.
694                }
695            }
696        }
697        // Sender dropped + channel drained → graceful shutdown. Return
698        // the receiver so the supervisor (which owns the sender
699        // ultimately via the queue handle) can drop it cleanly.
700        receiver
701    })
702}
703
704/// Supervisor: spawns the drainer with panic recovery. Any panic
705/// caught at the `JoinHandle::is_panic()` boundary triggers a
706/// respawn with a FRESH sink (`make_sink()`), preserving the
707/// receiver and the metrics handle.
708///
709/// The supervisor task returns when either:
710///   - The channel sender is dropped and the channel is fully
711///     drained (graceful shutdown).
712///   - `max_restarts` consecutive panics occur (default `u32::MAX`
713///     — effectively never gives up; an operator that wants to
714///     fail loudly on persistent panics can configure a finite
715///     limit).
716///
717/// The returned `JoinHandle` resolves when the supervisor exits.
718#[must_use]
719pub fn spawn_supervised_drainer<F, S>(
720    receiver: UnboundedReceiver<DeferredAuditEvent>,
721    make_sink: F,
722    metrics: DeferredAuditMetrics,
723    max_restarts: u32,
724) -> JoinHandle<()>
725where
726    F: Fn() -> S + Send + 'static,
727    S: DeferredAuditSink + 'static,
728{
729    tokio::spawn(async move {
730        // Drainer iteration. The receiver lives inside the spawned
731        // task; on graceful shutdown the task returns it back to
732        // us. On panic the receiver is lost — see the
733        // documentation block for the supervisor restart pattern.
734        let sink = make_sink();
735        let handle = spawn_drainer_task(receiver, sink, metrics.clone());
736        match handle.await {
737            Ok(returned_receiver) => {
738                // Drainer exited gracefully — sender dropped + drained.
739                drop(returned_receiver);
740            }
741            Err(join_err) if join_err.is_panic() => {
742                metrics.drainer_panics.fetch_add(1, Ordering::Relaxed);
743                tracing::error!(
744                    "deferred_audit supervisor: drainer task panicked ({join_err}); \
745                     max_restarts={max_restarts} — receiver moved into the panicked \
746                     task and cannot be recovered; future refusals submitted to the \
747                     existing queue will fail to land. Operator action required: \
748                     rebuild the daemon's deferred-audit queue (or restart the daemon) \
749                     to restore the audit-chain property."
750                );
751                // We cannot loop without a valid receiver. The
752                // max_restarts variable is preserved in the API so
753                // future revisions can introduce a buffering scheme
754                // that lets the supervisor recover the in-flight
755                // events; today the contract is "panic in drainer
756                // = chain loss on the unflushed buffer, future
757                // events recorded as send_failures".
758                let _ = max_restarts;
759            }
760            Err(join_err) => {
761                // Cancellation (task aborted) — treat as shutdown.
762                tracing::warn!(
763                    "deferred_audit supervisor: drainer aborted ({join_err}); \
764                     pending events may be lost"
765                );
766            }
767        }
768    })
769}
770
771/// Close the queue and wait for the supervisor task to drain every
772/// pending event. After this returns the chain-log property is
773/// "every refusal submitted before close lands in `signed_events`."
774///
775/// # Errors
776///
777/// Returns the `tokio::task::JoinError` if the supervisor task
778/// panicked while draining (rare — the supervisor catches drainer
779/// panics, but its own panic would surface here).
780pub async fn close_and_flush(
781    queue: DeferredAuditQueue,
782    supervisor: JoinHandle<()>,
783) -> std::result::Result<(), tokio::task::JoinError> {
784    // Drop the producer-side sender — once every clone is dropped,
785    // the receiver's `recv().await` returns None and the drainer
786    // exits gracefully.
787    drop(queue);
788    supervisor.await
789}
790
791/// Default bounded wait for [`drain_pending`] — how long the daemon's
792/// shutdown path waits for the drainer to flush every submitted refusal
793/// into `signed_events` before giving up and proceeding to WAL checkpoint
794/// + exit. Five seconds comfortably covers a multi-thousand-event backlog
795/// at the sink's ~25-100 microsecond-per-append rate while still bounding
796/// shutdown latency if the drainer is genuinely wedged.
797pub const DEFAULT_SHUTDOWN_DRAIN_TIMEOUT: std::time::Duration =
798    std::time::Duration::from_secs(SHUTDOWN_DRAIN_TIMEOUT_SECS);
799
800/// Whole-seconds component of [`DEFAULT_SHUTDOWN_DRAIN_TIMEOUT`]. Named so
801/// the magnitude is not an inline literal inside the `Duration` const.
802const SHUTDOWN_DRAIN_TIMEOUT_SECS: u64 = 5;
803
804/// Poll cadence for [`drain_pending`]. The drainer advances its atomic
805/// counters from another task, so the shutdown path samples them on this
806/// interval rather than busy-spinning.
807const DRAIN_POLL_INTERVAL: std::time::Duration =
808    std::time::Duration::from_millis(DRAIN_POLL_INTERVAL_MILLIS);
809
810/// Whole-milliseconds component of [`DRAIN_POLL_INTERVAL`].
811const DRAIN_POLL_INTERVAL_MILLIS: u64 = 10;
812
813/// Wait (bounded) until every event submitted to the queue has been
814/// accounted for by the drainer — appended to `signed_events`, landed in
815/// the DLQ / counted as an append failure, or recorded as a send failure.
816///
817/// This is the daemon shutdown-path counterpart to [`close_and_flush`].
818/// Unlike `close_and_flush`, it does NOT require the producer-side senders
819/// to be dropped: the daemon installs the queue's sender clones inside
820/// process-wide `OnceLock` governance hooks
821/// (`storage::GOVERNANCE_PRE_WRITE`, `wire_check::GOVERNANCE_PRE_ACTION`)
822/// that live for the entire process lifetime, so the channel never closes
823/// and the drainer's `recv().await` never returns `None`. Awaiting the
824/// supervisor would therefore block forever. Instead we wait for the
825/// drainer to catch up to the submitted count by polling the shared atomic
826/// metrics.
827///
828/// MUST be called only after every write path that can submit a refusal
829/// has quiesced (i.e. after the HTTP server's graceful-shutdown future has
830/// resolved). At that point `submitted` is final, so the loop terminates
831/// as soon as the drainer finishes the backlog.
832///
833/// Returns `true` when the queue fully drained within `timeout`, `false`
834/// when the timeout elapsed with events still in flight (the caller should
835/// log the residual so the audit gap is visible to operators).
836pub async fn drain_pending(metrics: &DeferredAuditMetrics, timeout: std::time::Duration) -> bool {
837    let deadline = tokio::time::Instant::now() + timeout;
838    loop {
839        if drain_accounted(metrics) >= metrics.submitted_count() {
840            return true;
841        }
842        if tokio::time::Instant::now() >= deadline {
843            return false;
844        }
845        tokio::time::sleep(DRAIN_POLL_INTERVAL).await;
846    }
847}
848
849/// Number of submitted events the drainer has finished with — every
850/// received event bumps exactly one of `appended` / `append_failures`
851/// (the latter also covers DLQ landings), and a closed receiver bumps
852/// `send_failures`. The sum equalling `submitted` means the backlog is
853/// fully processed.
854#[must_use]
855fn drain_accounted(metrics: &DeferredAuditMetrics) -> u64 {
856    metrics
857        .appended_count()
858        .saturating_add(metrics.append_failure_count())
859        .saturating_add(metrics.send_failure_count())
860}
861
862// ---------------------------------------------------------------------------
863// Convenience installer for the daemon path
864// ---------------------------------------------------------------------------
865
866/// Build a queue + spawn a supervised drainer in one call. Returns
867/// the producer handle and the supervisor `JoinHandle` — the daemon
868/// stashes the queue on `AppState` and the join handle in
869/// `task_handles` so `serve` aborts it on shutdown.
870///
871/// The drainer opens a FRESH `Connection` per its sink (via
872/// `SqliteSignedEventsSink::new(db_path)`); on respawn after panic
873/// the sink is rebuilt verbatim. No connection is shared with the
874/// substrate writer.
875#[must_use]
876pub fn install_deferred_audit_drainer(db_path: &Path) -> (DeferredAuditQueue, JoinHandle<()>) {
877    let (queue, receiver) = DeferredAuditQueue::new();
878    let metrics = queue.metrics();
879    let db_path_buf = db_path.to_path_buf();
880    let metrics_for_factory = metrics.clone();
881    let supervisor = spawn_supervised_drainer(
882        receiver,
883        move || {
884            SqliteSignedEventsSink::with_metrics(db_path_buf.clone(), metrics_for_factory.clone())
885        },
886        metrics,
887        u32::MAX,
888    );
889    (queue, supervisor)
890}
891
892// ---------------------------------------------------------------------------
893// Tests
894// ---------------------------------------------------------------------------
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899    use std::sync::Mutex;
900
901    // -----------------------------------------------------------------
902    // DeferredAuditEvent shape + canonical bytes
903    // -----------------------------------------------------------------
904
905    fn refusal_action() -> AgentAction {
906        AgentAction::Custom {
907            custom_kind: "memory_write".to_string(),
908            payload: serde_json::json!({"namespace": "secrets/api"}),
909        }
910    }
911
912    fn refusal_decision() -> Decision {
913        Decision::Refuse {
914            rule_id: "R001".to_string(),
915            reason: "no writes to secrets/*".to_string(),
916        }
917    }
918
919    #[test]
920    fn from_refusal_returns_some_for_refuse() {
921        let event =
922            DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &refusal_decision())
923                .expect("must be Some for Refuse verdict");
924        assert_eq!(event.agent_id, "agent:alice");
925        assert_eq!(event.rule_id(), Some("R001"));
926        assert_eq!(event.reason(), Some("no writes to secrets/*"));
927    }
928
929    #[test]
930    fn from_refusal_returns_none_for_allow() {
931        let event =
932            DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &Decision::Allow);
933        assert!(event.is_none(), "Allow verdict must not enqueue an event");
934    }
935
936    #[test]
937    fn from_refusal_returns_none_for_warn() {
938        let warn = Decision::Warn {
939            rule_id: "W001".to_string(),
940            reason: "warning only".to_string(),
941        };
942        let event = DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &warn);
943        assert!(event.is_none(), "Warn verdict must not enqueue a refusal");
944    }
945
946    #[test]
947    fn canonical_bytes_includes_rule_and_action() {
948        let event =
949            DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &refusal_decision())
950                .unwrap();
951        let bytes = event.canonical_bytes().unwrap();
952        let s = std::str::from_utf8(&bytes).unwrap();
953        assert!(s.contains("R001"), "canonical payload must include rule_id");
954        assert!(
955            s.contains("memory_write"),
956            "canonical payload must include action kind"
957        );
958        assert!(
959            s.contains("agent:alice"),
960            "canonical payload must include agent id"
961        );
962    }
963
964    #[test]
965    fn rule_id_returns_none_for_non_refusal() {
966        let event = DeferredAuditEvent {
967            agent_id: "x".into(),
968            action: refusal_action(),
969            decision: Decision::Allow,
970            timestamp: chrono::Utc::now(),
971        };
972        assert!(event.rule_id().is_none());
973        assert!(event.reason().is_none());
974    }
975
976    // -----------------------------------------------------------------
977    // DeferredAuditQueue submit + non-blocking semantics
978    // -----------------------------------------------------------------
979
980    #[tokio::test]
981    async fn queue_new_returns_open_handle() {
982        let (queue, _rx) = DeferredAuditQueue::new();
983        assert!(queue.is_open());
984        assert_eq!(queue.metrics().submitted_count(), 0);
985    }
986
987    #[tokio::test]
988    async fn submit_with_receiver_attached_succeeds() {
989        let (queue, mut rx) = DeferredAuditQueue::new();
990        let event =
991            DeferredAuditEvent::from_refusal("agent:t", &refusal_action(), &refusal_decision())
992                .unwrap();
993        assert!(queue.submit(event.clone()));
994        assert_eq!(queue.metrics().submitted_count(), 1);
995        let received = rx.recv().await.unwrap();
996        assert_eq!(received.agent_id, event.agent_id);
997        assert_eq!(received.rule_id(), Some("R001"));
998    }
999
1000    #[tokio::test]
1001    async fn submit_after_receiver_dropped_records_send_failure() {
1002        let (queue, rx) = DeferredAuditQueue::new();
1003        drop(rx);
1004        // sender knows its peer is gone
1005        assert!(!queue.is_open());
1006        let event =
1007            DeferredAuditEvent::from_refusal("agent:t", &refusal_action(), &refusal_decision())
1008                .unwrap();
1009        let ok = queue.submit(event);
1010        assert!(!ok, "submit must return false when receiver is closed");
1011        assert_eq!(queue.metrics().submitted_count(), 1);
1012        assert_eq!(queue.metrics().send_failure_count(), 1);
1013    }
1014
1015    #[tokio::test]
1016    async fn submit_refusal_helper_skips_non_refusals() {
1017        let (queue, mut rx) = DeferredAuditQueue::new();
1018        // Allow does NOT enqueue
1019        let enq = queue.submit_refusal("agent:t", &refusal_action(), &Decision::Allow);
1020        assert!(!enq);
1021        // Try receiving — should timeout (channel empty)
1022        let recv = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1023        assert!(recv.is_err(), "no event should have been enqueued");
1024        // Refusal DOES enqueue
1025        let enq2 = queue.submit_refusal("agent:t", &refusal_action(), &refusal_decision());
1026        assert!(enq2);
1027        let event = rx.recv().await.unwrap();
1028        assert_eq!(event.agent_id, "agent:t");
1029    }
1030
1031    #[tokio::test]
1032    async fn queue_clone_shares_underlying_channel() {
1033        let (queue, mut rx) = DeferredAuditQueue::new();
1034        let clone = queue.clone();
1035        let event1 =
1036            DeferredAuditEvent::from_refusal("agent:a", &refusal_action(), &refusal_decision())
1037                .unwrap();
1038        let event2 =
1039            DeferredAuditEvent::from_refusal("agent:b", &refusal_action(), &refusal_decision())
1040                .unwrap();
1041        queue.submit(event1);
1042        clone.submit(event2);
1043        let r1 = rx.recv().await.unwrap();
1044        let r2 = rx.recv().await.unwrap();
1045        let agents: Vec<_> = vec![r1.agent_id, r2.agent_id];
1046        assert!(agents.contains(&"agent:a".to_string()));
1047        assert!(agents.contains(&"agent:b".to_string()));
1048        // Both sides share the same metrics handle
1049        assert_eq!(queue.metrics().submitted_count(), 2);
1050        assert_eq!(clone.metrics().submitted_count(), 2);
1051    }
1052
1053    // -----------------------------------------------------------------
1054    // Drainer task behavior with mock sink
1055    // -----------------------------------------------------------------
1056
1057    /// Mock sink: stores every received event in an owned Vec for
1058    /// post-condition assertions; optionally panics on the Nth
1059    /// event to drive supervisor-recovery tests.
1060    #[derive(Clone, Default)]
1061    struct MockSink {
1062        // Recorded events, behind a mutex so the test can read while
1063        // the drainer writes.
1064        recorded: Arc<Mutex<Vec<DeferredAuditEvent>>>,
1065        // Optional: panic on the Nth append (zero-indexed).
1066        panic_on: Option<usize>,
1067        // Optional: error on the Nth append (zero-indexed).
1068        error_on: Option<usize>,
1069        // Counter (shared across clones for supervisor-restart
1070        // counting).
1071        call_count: Arc<AtomicU64>,
1072    }
1073
1074    impl DeferredAuditSink for MockSink {
1075        fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
1076            let prior = self.call_count.fetch_add(1, Ordering::SeqCst) as usize;
1077            if Some(prior) == self.panic_on {
1078                panic!("mock sink: configured panic at call {prior}");
1079            }
1080            if Some(prior) == self.error_on {
1081                return Err(anyhow::anyhow!(
1082                    "mock sink: configured error at call {prior}"
1083                ));
1084            }
1085            self.recorded.lock().unwrap().push(event.clone());
1086            Ok(AppendOutcome::Appended)
1087        }
1088    }
1089
1090    #[tokio::test]
1091    async fn drainer_appends_every_submitted_event() {
1092        let (queue, rx) = DeferredAuditQueue::new();
1093        let metrics = queue.metrics();
1094        let sink = MockSink::default();
1095        let recorded = sink.recorded.clone();
1096        let handle = spawn_drainer_task(rx, sink, metrics.clone());
1097
1098        for i in 0..5 {
1099            let mut event = DeferredAuditEvent::from_refusal(
1100                &format!("agent:{i}"),
1101                &refusal_action(),
1102                &refusal_decision(),
1103            )
1104            .unwrap();
1105            event.timestamp = chrono::Utc::now();
1106            queue.submit(event);
1107        }
1108
1109        // Drop the queue (sender) to terminate the drainer.
1110        drop(queue);
1111        let _returned_rx = handle.await.unwrap();
1112
1113        let recorded = recorded.lock().unwrap();
1114        assert_eq!(recorded.len(), 5);
1115        for (i, ev) in recorded.iter().enumerate() {
1116            assert_eq!(ev.agent_id, format!("agent:{i}"));
1117        }
1118        assert_eq!(metrics.appended_count(), 5);
1119    }
1120
1121    #[tokio::test]
1122    async fn drainer_continues_after_sink_error() {
1123        // Sink errors on the second call; the drainer should
1124        // record the error in metrics and proceed to handle
1125        // subsequent events.
1126        let (queue, rx) = DeferredAuditQueue::new();
1127        let metrics = queue.metrics();
1128        let mut sink = MockSink::default();
1129        sink.error_on = Some(1);
1130        let recorded = sink.recorded.clone();
1131        let handle = spawn_drainer_task(rx, sink, metrics.clone());
1132
1133        for i in 0..3 {
1134            let event = DeferredAuditEvent::from_refusal(
1135                &format!("agent:{i}"),
1136                &refusal_action(),
1137                &refusal_decision(),
1138            )
1139            .unwrap();
1140            queue.submit(event);
1141        }
1142        drop(queue);
1143        let _ = handle.await.unwrap();
1144        // Event 0 and 2 landed; event 1 hit the error.
1145        let recorded = recorded.lock().unwrap();
1146        assert_eq!(recorded.len(), 2);
1147        assert_eq!(metrics.appended_count(), 2);
1148        assert_eq!(metrics.append_failure_count(), 1);
1149    }
1150
1151    // -----------------------------------------------------------------
1152    // Supervisor: panic recovery + graceful shutdown
1153    // -----------------------------------------------------------------
1154
1155    #[tokio::test]
1156    async fn supervisor_records_panic_metric_on_drainer_panic() {
1157        // Sink panics on first call. The supervisor catches the
1158        // panic, bumps drainer_panics, and (per current
1159        // implementation) terminates after the panic — the receiver
1160        // moved into the panicked task and cannot be recovered.
1161        // We verify the panic-counter side-effect.
1162        let (queue, rx) = DeferredAuditQueue::new();
1163        let metrics = queue.metrics();
1164        let panic_on = Some(0_usize);
1165        let supervisor = spawn_supervised_drainer(
1166            rx,
1167            move || MockSink {
1168                recorded: Arc::new(Mutex::new(Vec::new())),
1169                panic_on,
1170                error_on: None,
1171                call_count: Arc::new(AtomicU64::new(0)),
1172            },
1173            metrics.clone(),
1174            1, // max 1 restart (= no respawn beyond the initial spawn)
1175        );
1176
1177        let event =
1178            DeferredAuditEvent::from_refusal("agent:panic", &refusal_action(), &refusal_decision())
1179                .unwrap();
1180        queue.submit(event);
1181        // Wait for the supervisor to observe the panic and exit.
1182        let _ = tokio::time::timeout(std::time::Duration::from_secs(2), supervisor)
1183            .await
1184            .expect("supervisor must exit after observing panic");
1185        assert_eq!(
1186            metrics.panic_count(),
1187            1,
1188            "supervisor must record exactly one drainer panic"
1189        );
1190    }
1191
1192    #[tokio::test]
1193    async fn supervisor_graceful_shutdown_drains_buffered_events() {
1194        let (queue, rx) = DeferredAuditQueue::new();
1195        let metrics = queue.metrics();
1196        let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
1197        let recorded_for_factory = recorded.clone();
1198        let supervisor = spawn_supervised_drainer(
1199            rx,
1200            move || MockSink {
1201                recorded: recorded_for_factory.clone(),
1202                panic_on: None,
1203                error_on: None,
1204                call_count: Arc::new(AtomicU64::new(0)),
1205            },
1206            metrics.clone(),
1207            u32::MAX,
1208        );
1209
1210        // Submit 50 events.
1211        for i in 0..50 {
1212            let event = DeferredAuditEvent::from_refusal(
1213                &format!("agent:{i}"),
1214                &refusal_action(),
1215                &refusal_decision(),
1216            )
1217            .unwrap();
1218            queue.submit(event);
1219        }
1220
1221        // Initiate shutdown — close_and_flush drops the queue and
1222        // awaits the supervisor.
1223        close_and_flush(queue, supervisor)
1224            .await
1225            .expect("supervisor must terminate cleanly");
1226
1227        let recorded = recorded.lock().unwrap();
1228        assert_eq!(
1229            recorded.len(),
1230            50,
1231            "shutdown must drain every buffered event"
1232        );
1233        assert_eq!(metrics.appended_count(), 50);
1234    }
1235
1236    // -----------------------------------------------------------------
1237    // drain_pending — shutdown drain that does NOT require the senders to
1238    // be dropped (the daemon's governance hooks hold OnceLock-resident
1239    // sender clones that live for the whole process, so the channel never
1240    // closes — `close_and_flush` would block forever; `drain_pending`
1241    // polls the metrics instead).
1242    // -----------------------------------------------------------------
1243
1244    #[tokio::test]
1245    async fn drain_pending_completes_without_closing_channel() {
1246        let (queue, rx) = DeferredAuditQueue::new();
1247        let metrics = queue.metrics();
1248        let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
1249        let recorded_for_factory = recorded.clone();
1250        let supervisor = spawn_supervised_drainer(
1251            rx,
1252            move || MockSink {
1253                recorded: recorded_for_factory.clone(),
1254                panic_on: None,
1255                error_on: None,
1256                call_count: Arc::new(AtomicU64::new(0)),
1257            },
1258            metrics.clone(),
1259            u32::MAX,
1260        );
1261
1262        for i in 0..50 {
1263            let event = DeferredAuditEvent::from_refusal(
1264                &format!("agent:{i}"),
1265                &refusal_action(),
1266                &refusal_decision(),
1267            )
1268            .unwrap();
1269            queue.submit(event);
1270        }
1271
1272        // Simulate the daemon's OnceLock-held sender by keeping a clone
1273        // alive across the drain — the channel MUST stay open the whole
1274        // time (this is exactly the scenario that wedges close_and_flush).
1275        let hook_held_sender = queue.clone();
1276
1277        let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
1278        assert!(
1279            drained,
1280            "drain_pending must complete while the channel is open"
1281        );
1282        assert!(
1283            hook_held_sender.is_open(),
1284            "channel must still be open — drain_pending must not depend on sender drop"
1285        );
1286        assert_eq!(metrics.appended_count(), 50);
1287        assert_eq!(recorded.lock().unwrap().len(), 50);
1288
1289        // Cleanup: drop both producer handles so the drainer can exit.
1290        drop(hook_held_sender);
1291        drop(queue);
1292        let _ = supervisor.await;
1293    }
1294
1295    #[tokio::test]
1296    async fn drain_pending_times_out_when_drainer_absent() {
1297        // No drainer spawned: submitted events sit in the channel buffer
1298        // forever, so the metrics never advance and drain_pending must
1299        // report a timeout (false) rather than hang.
1300        let (queue, _rx) = DeferredAuditQueue::new();
1301        let metrics = queue.metrics();
1302        for i in 0..3 {
1303            let event = DeferredAuditEvent::from_refusal(
1304                &format!("agent:{i}"),
1305                &refusal_action(),
1306                &refusal_decision(),
1307            )
1308            .unwrap();
1309            queue.submit(event);
1310        }
1311        let drained = drain_pending(&metrics, std::time::Duration::from_millis(100)).await;
1312        assert!(
1313            !drained,
1314            "drain_pending must return false when events never get accounted"
1315        );
1316        assert_eq!(metrics.submitted_count(), 3);
1317        assert_eq!(metrics.appended_count(), 0);
1318    }
1319
1320    #[tokio::test]
1321    async fn drain_pending_returns_immediately_when_already_drained() {
1322        let (queue, _rx) = DeferredAuditQueue::new();
1323        let metrics = queue.metrics();
1324        // Nothing submitted → already drained → returns true at once.
1325        let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
1326        assert!(drained);
1327    }
1328
1329    #[tokio::test]
1330    async fn drain_pending_counts_append_failures_as_accounted() {
1331        // A sink that always errors still "accounts" for the event via
1332        // append_failures, so drain_pending must terminate (the audit row
1333        // is lost/DLQ'd but the shutdown path must not hang on it).
1334        let (queue, rx) = DeferredAuditQueue::new();
1335        let metrics = queue.metrics();
1336        // Factory yields a sink that errors on its first (only) call, so
1337        // the event is "accounted" via append_failures rather than appended.
1338        let supervisor = spawn_supervised_drainer(
1339            rx,
1340            move || MockSink {
1341                recorded: Arc::new(Mutex::new(Vec::new())),
1342                panic_on: None,
1343                error_on: Some(0),
1344                call_count: Arc::new(AtomicU64::new(0)),
1345            },
1346            metrics.clone(),
1347            u32::MAX,
1348        );
1349        let event =
1350            DeferredAuditEvent::from_refusal("agent:err", &refusal_action(), &refusal_decision())
1351                .unwrap();
1352        queue.submit(event);
1353        let hook_held = queue.clone();
1354        let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
1355        assert!(
1356            drained,
1357            "append failures count as accounted — must not hang"
1358        );
1359        assert_eq!(metrics.append_failure_count(), 1);
1360        drop(hook_held);
1361        drop(queue);
1362        let _ = supervisor.await;
1363    }
1364
1365    #[tokio::test]
1366    async fn close_and_flush_works_with_zero_events() {
1367        let (queue, rx) = DeferredAuditQueue::new();
1368        let metrics = queue.metrics();
1369        let supervisor =
1370            spawn_supervised_drainer(rx, move || MockSink::default(), metrics.clone(), u32::MAX);
1371        close_and_flush(queue, supervisor).await.unwrap();
1372        assert_eq!(metrics.appended_count(), 0);
1373        assert_eq!(metrics.submitted_count(), 0);
1374    }
1375
1376    // -----------------------------------------------------------------
1377    // High-volume / backpressure-edge — drainer slow, many submits
1378    // queued, all drain eventually
1379    // -----------------------------------------------------------------
1380
1381    /// Slow sink: artificial 1 ms delay per append to simulate a
1382    /// busy fsync path.
1383    struct SlowSink {
1384        recorded: Arc<Mutex<Vec<DeferredAuditEvent>>>,
1385    }
1386
1387    impl DeferredAuditSink for SlowSink {
1388        fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
1389            std::thread::sleep(std::time::Duration::from_millis(1));
1390            self.recorded.lock().unwrap().push(event.clone());
1391            Ok(AppendOutcome::Appended)
1392        }
1393    }
1394
1395    #[tokio::test]
1396    async fn unbounded_queue_handles_burst_no_drops() {
1397        let (queue, rx) = DeferredAuditQueue::new();
1398        let metrics = queue.metrics();
1399        let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
1400        let recorded_for_factory = recorded.clone();
1401        let supervisor = spawn_supervised_drainer(
1402            rx,
1403            move || SlowSink {
1404                recorded: recorded_for_factory.clone(),
1405            },
1406            metrics.clone(),
1407            u32::MAX,
1408        );
1409
1410        // Burst 200 events (drainer is ~1 ms/event so this will
1411        // accumulate before draining).
1412        for i in 0..200 {
1413            let event = DeferredAuditEvent::from_refusal(
1414                &format!("agent:{i}"),
1415                &refusal_action(),
1416                &refusal_decision(),
1417            )
1418            .unwrap();
1419            assert!(
1420                queue.submit(event),
1421                "unbounded queue must never refuse a submit"
1422            );
1423        }
1424        assert_eq!(metrics.submitted_count(), 200);
1425        assert_eq!(metrics.send_failure_count(), 0);
1426
1427        close_and_flush(queue, supervisor).await.unwrap();
1428        let recorded = recorded.lock().unwrap();
1429        assert_eq!(recorded.len(), 200);
1430        assert_eq!(metrics.appended_count(), 200);
1431    }
1432
1433    // -----------------------------------------------------------------
1434    // Production-sink (SqliteSignedEventsSink) integration — opens a
1435    // real SQLite Connection against a temp file and asserts the row
1436    // lands.
1437    // -----------------------------------------------------------------
1438
1439    fn fresh_tempdir() -> tempfile::TempDir {
1440        // Honor project hard rule: no /tmp writes by name. The
1441        // tempfile crate honors TMPDIR (exported at session
1442        // bootstrap to .local-runs/tmp), so this resolves under the
1443        // project-local scratch tree.
1444        tempfile::tempdir().expect("tempdir")
1445    }
1446
1447    #[tokio::test]
1448    async fn sqlite_sink_appends_governance_refusal_row() {
1449        let dir = fresh_tempdir();
1450        let db_path = dir.path().join("def-audit-test.db");
1451        // Pre-create the schema via crate::db::open (applies
1452        // migrations including signed_events).
1453        let _ = crate::db::open(&db_path).expect("init db");
1454
1455        let (queue, rx) = DeferredAuditQueue::new();
1456        let metrics = queue.metrics();
1457        let db_path_buf = db_path.clone();
1458        let supervisor = spawn_supervised_drainer(
1459            rx,
1460            move || SqliteSignedEventsSink::new(db_path_buf.clone()),
1461            metrics.clone(),
1462            u32::MAX,
1463        );
1464
1465        let event =
1466            DeferredAuditEvent::from_refusal("agent:int", &refusal_action(), &refusal_decision())
1467                .unwrap();
1468        queue.submit(event);
1469
1470        close_and_flush(queue, supervisor).await.unwrap();
1471        assert_eq!(metrics.appended_count(), 1);
1472
1473        // Verify the row landed with event_type=governance.refusal.
1474        let conn = crate::db::open(&db_path).expect("reopen db");
1475        let count: i64 = conn
1476            .query_row(
1477                "SELECT COUNT(*) FROM signed_events WHERE event_type = ?1 AND agent_id = ?2",
1478                rusqlite::params![GOVERNANCE_REFUSAL_EVENT_TYPE, "agent:int"],
1479                |r| r.get(0),
1480            )
1481            .unwrap();
1482        assert_eq!(count, 1, "drainer must have written the row");
1483    }
1484
1485    #[tokio::test]
1486    async fn sqlite_sink_lazy_open_only_on_first_append() {
1487        // Construct a sink against a path that doesn't exist yet; if
1488        // we never call append, ensure_conn must never run.
1489        let nonexistent = std::path::PathBuf::from("/this/path/does/not/exist/db.sqlite");
1490        let sink = SqliteSignedEventsSink::new(nonexistent);
1491        // Just verifying construction doesn't open the DB — no
1492        // assertion needed; if `new` opened eagerly, this would
1493        // already have errored.
1494        drop(sink);
1495    }
1496
1497    #[tokio::test]
1498    async fn sqlite_sink_append_fails_on_bad_path_metrics_increments() {
1499        let (queue, rx) = DeferredAuditQueue::new();
1500        let metrics = queue.metrics();
1501        // Build a sink pointing at a path that can't be opened (a
1502        // directory that doesn't exist + a non-creatable subdir
1503        // would do it; under macOS/Linux we use a path under /sys
1504        // which is read-only).
1505        let bad_path =
1506            std::path::PathBuf::from("/nonexistent-readonly-dir-for-deferred-audit-test/db.sqlite");
1507        let supervisor = spawn_supervised_drainer(
1508            rx,
1509            move || SqliteSignedEventsSink::new(bad_path.clone()),
1510            metrics.clone(),
1511            u32::MAX,
1512        );
1513        let event =
1514            DeferredAuditEvent::from_refusal("agent:bad", &refusal_action(), &refusal_decision())
1515                .unwrap();
1516        queue.submit(event);
1517        // Allow the drainer to attempt the append.
1518        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1519        close_and_flush(queue, supervisor).await.unwrap();
1520        assert!(
1521            metrics.append_failure_count() >= 1,
1522            "append failure on bad path must be recorded; got {}",
1523            metrics.append_failure_count()
1524        );
1525        assert_eq!(metrics.appended_count(), 0);
1526    }
1527
1528    // -----------------------------------------------------------------
1529    // install_deferred_audit_drainer end-to-end (the daemon-facing
1530    // installer)
1531    // -----------------------------------------------------------------
1532
1533    #[tokio::test]
1534    async fn installer_returns_open_queue_and_running_supervisor() {
1535        let dir = fresh_tempdir();
1536        let db_path = dir.path().join("installer-test.db");
1537        let _ = crate::db::open(&db_path).expect("init db");
1538
1539        let (queue, supervisor) = install_deferred_audit_drainer(&db_path);
1540        assert!(queue.is_open());
1541
1542        let event = DeferredAuditEvent::from_refusal(
1543            "agent:installer",
1544            &refusal_action(),
1545            &refusal_decision(),
1546        )
1547        .unwrap();
1548        queue.submit(event);
1549
1550        close_and_flush(queue, supervisor).await.unwrap();
1551
1552        let conn = crate::db::open(&db_path).expect("reopen db");
1553        let count: i64 = conn
1554            .query_row(
1555                "SELECT COUNT(*) FROM signed_events WHERE event_type = ?1",
1556                rusqlite::params![GOVERNANCE_REFUSAL_EVENT_TYPE],
1557                |r| r.get(0),
1558            )
1559            .unwrap();
1560        assert_eq!(count, 1);
1561    }
1562
1563    // -----------------------------------------------------------------
1564    // Cluster-C SEC-3 (issue #767) — DLQ + race-retry coverage
1565    // -----------------------------------------------------------------
1566
1567    /// When the sink hits a non-race rusqlite error on the
1568    /// `signed_events` INSERT, the event MUST land in
1569    /// `signed_events_dlq` (NOT silently dropped) and the
1570    /// `dlq_landed` counter MUST bump.
1571    ///
1572    /// Failure injection: pre-fill `signed_events` with a row whose
1573    /// `id` collides with the next UUIDv4 the sink will mint. We can't
1574    /// predict UUIDs, so we instead break the table at the schema
1575    /// level: drop the `event_type` column. The next `append_signed_event`
1576    /// fails on the INSERT (column not found) — an unrecoverable
1577    /// non-race error that triggers DLQ land.
1578    #[tokio::test]
1579    async fn sqlite_sink_lands_event_in_dlq_on_unrecoverable_error() {
1580        let dir = fresh_tempdir();
1581        let db_path = dir.path().join("dlq-test.db");
1582        let _ = crate::db::open(&db_path).expect("init db");
1583
1584        // Inject the fault: drop the NOT-NULL `event_type` column from
1585        // signed_events. Subsequent inserts fail with a constraint
1586        // error (NULL into NOT NULL slot) — an unrecoverable non-race
1587        // error. We do this by rebuilding the table without the
1588        // column; the DLQ table stays intact.
1589        {
1590            let conn = crate::db::open(&db_path).expect("open for fault inject");
1591            conn.execute_batch(
1592                "DROP TABLE signed_events; \
1593                 CREATE TABLE signed_events ( \
1594                    id TEXT PRIMARY KEY, \
1595                    agent_id TEXT NOT NULL, \
1596                    payload_hash BLOB NOT NULL, \
1597                    signature BLOB, \
1598                    attest_level TEXT NOT NULL DEFAULT 'unsigned', \
1599                    timestamp TEXT NOT NULL, \
1600                    prev_hash BLOB, \
1601                    sequence INTEGER \
1602                 ); \
1603                 CREATE UNIQUE INDEX idx_signed_events_sequence \
1604                    ON signed_events(sequence);",
1605            )
1606            .expect("fault-inject schema rewrite");
1607        }
1608
1609        // Spawn drainer + submit one event. The sink will fail the
1610        // append (column missing → schema mismatch is unrecoverable)
1611        // and land it in DLQ.
1612        let (queue, supervisor) = install_deferred_audit_drainer(&db_path);
1613        let metrics = queue.metrics();
1614        let event = DeferredAuditEvent::from_refusal(
1615            "agent:dlq-test",
1616            &refusal_action(),
1617            &refusal_decision(),
1618        )
1619        .unwrap();
1620        queue.submit(event);
1621        close_and_flush(queue, supervisor).await.unwrap();
1622
1623        // Assert: chain row NOT written, DLQ row present, counters
1624        // reflect the outcome.
1625        let conn = crate::db::open(&db_path).expect("reopen db");
1626        let chain_count: i64 = conn
1627            .query_row("SELECT COUNT(*) FROM signed_events", [], |r| r.get(0))
1628            .unwrap_or(0);
1629        assert_eq!(
1630            chain_count, 0,
1631            "signed_events chain MUST NOT advance when sink fails"
1632        );
1633        let dlq_count: i64 = conn
1634            .query_row("SELECT COUNT(*) FROM signed_events_dlq", [], |r| r.get(0))
1635            .unwrap();
1636        assert_eq!(dlq_count, 1, "exactly one DLQ row expected");
1637        let dlq_size_live = dlq_size(&conn).expect("dlq_size");
1638        assert_eq!(dlq_size_live, 1, "dlq_size helper must reflect live count");
1639        assert!(
1640            metrics.dlq_landed_count() >= 1,
1641            "dlq_landed metric must bump on DLQ landing"
1642        );
1643    }
1644
1645    /// `is_unique_constraint_race` MUST return true for a synthetic
1646    /// `SQLITE_CONSTRAINT_UNIQUE` error and false for other rusqlite
1647    /// errors. Pins the classification helper against rusqlite
1648    /// version drift.
1649    #[test]
1650    fn is_unique_constraint_race_classifies_correctly() {
1651        let unique_err = rusqlite::Error::SqliteFailure(
1652            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE),
1653            Some("UNIQUE constraint failed".to_string()),
1654        );
1655        let other_err = rusqlite::Error::SqliteFailure(
1656            rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
1657            Some("database is locked".to_string()),
1658        );
1659        let wrapped_unique: anyhow::Error =
1660            anyhow::Error::from(unique_err).context("append signed_event");
1661        let wrapped_other: anyhow::Error =
1662            anyhow::Error::from(other_err).context("append signed_event");
1663        assert!(is_unique_constraint_race(&wrapped_unique));
1664        assert!(!is_unique_constraint_race(&wrapped_other));
1665
1666        // Non-rusqlite errors are never UNIQUE races.
1667        let plain: anyhow::Error = anyhow::anyhow!("plain error");
1668        assert!(!is_unique_constraint_race(&plain));
1669    }
1670
1671    /// `dlq_size` MUST return 0 on a fresh DB (no DLQ rows).
1672    #[tokio::test]
1673    async fn dlq_size_returns_zero_on_fresh_db() {
1674        let dir = fresh_tempdir();
1675        let db_path = dir.path().join("dlq-empty.db");
1676        let conn = crate::db::open(&db_path).expect("init db");
1677        assert_eq!(dlq_size(&conn).expect("dlq_size on fresh"), 0);
1678    }
1679
1680    // -----------------------------------------------------------------
1681    // Metrics — getter coverage
1682    // -----------------------------------------------------------------
1683
1684    #[test]
1685    fn metrics_default_returns_zeroes() {
1686        let m = DeferredAuditMetrics::default();
1687        assert_eq!(m.submitted_count(), 0);
1688        assert_eq!(m.appended_count(), 0);
1689        assert_eq!(m.send_failure_count(), 0);
1690        assert_eq!(m.append_failure_count(), 0);
1691        assert_eq!(m.panic_count(), 0);
1692        assert_eq!(m.dlq_landed_count(), 0);
1693        assert_eq!(m.unique_race_retry_count(), 0);
1694    }
1695
1696    #[test]
1697    fn metrics_clone_shares_counters() {
1698        let m1 = DeferredAuditMetrics::default();
1699        let m2 = m1.clone();
1700        m1.submitted.fetch_add(7, Ordering::Relaxed);
1701        // m2 sees the same counter — Arc<Atomic> semantics.
1702        assert_eq!(m2.submitted_count(), 7);
1703    }
1704
1705    // -----------------------------------------------------------------
1706    // GOVERNANCE_REFUSAL_EVENT_TYPE is a stable wire string
1707    // -----------------------------------------------------------------
1708
1709    #[test]
1710    fn governance_refusal_event_type_is_stable() {
1711        assert_eq!(GOVERNANCE_REFUSAL_EVENT_TYPE, "governance.refusal");
1712    }
1713}