ai_memory/governance/deferred_audit.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 Policy-Engine Item 3 — deferred audit-log queue for the
5//! storage governance pre-write hook.
6//!
7//! # The gap this closes
8//!
9//! The substrate's `GOVERNANCE_PRE_WRITE` hook (installed in
10//! `daemon_runtime::bootstrap_serve`, consulted from every
11//! `storage::insert*` call) ran
12//! [`super::agent_action::check_agent_action_no_audit`] — the
13//! `_no_audit` suffix is there because emitting a `signed_events` row
14//! from INSIDE the in-flight INSERT transaction would re-enter the
15//! same `Connection` and deadlock under the substrate's
16//! `Arc<Mutex<Connection>>` lock.
17//!
18//! Consequence: **storage refusals were typed-but-not-cryptographically-logged**.
19//! Other paths (the audited [`super::agent_action::check_agent_action`]
20//! variant) DO chain-log via `signed_events`. That asymmetry was the
21//! known gap in the bypass-impossibility audit story.
22//!
23//! # The fix — deferred chain-log
24//!
25//! This module ships a process-wide `DeferredAuditQueue`:
26//!
27//! 1. The storage hook captures the refusal verdict + agent identity
28//! + canonical action payload as a [`DeferredAuditEvent`] and
29//! submits it to the queue via [`DeferredAuditQueue::submit`]
30//! (non-blocking, never panics).
31//! 2. A background drainer task ([`spawn_drainer_task`]) owns a FRESH
32//! `Connection` (opened against the same `db_path` but NOT the
33//! substrate writer's connection — SQLite WAL allows parallel
34//! readers and the drainer's writes don't contend with the
35//! in-flight `storage::insert` transaction because it has
36//! already released its lock by the time the drainer runs).
37//! 3. For every received event, the drainer appends a
38//! `governance.refusal` row to `signed_events`. The chain-log
39//! property closes.
40//!
41//! # Supervisor pattern
42//!
43//! The drainer task is wrapped by [`spawn_supervised_drainer`] which
44//! restarts the inner task on panic. A panic in the drainer would
45//! otherwise silently drop the audit chain — a regression worse than
46//! the original gap. The supervisor uses `tokio::task::spawn` with
47//! `JoinHandle` polling so cleanup on shutdown is deterministic.
48//!
49//! # Backpressure / lossiness
50//!
51//! The channel is `tokio::sync::mpsc::unbounded_channel` by design:
52//!
53//! - Refusals are rare (a properly-configured fleet refuses
54//! << 1% of writes).
55//! - A bounded channel would silently drop on full — and a silent
56//! audit drop IS a security regression we cannot accept.
57//! - Memory pressure under attack is bounded by the rate at which the
58//! drainer can append `signed_events` rows; on macOS / Linux a
59//! single SQLite append in WAL mode is ~25-100 microseconds, so a
60//! sustained 100k refusals/second saturates one core but never
61//! blocks the storage write path.
62//!
63//! # Graceful shutdown
64//!
65//! [`DeferredAuditQueue::close_and_flush`] drops the sender and
66//! awaits the supervisor task to terminate. The drainer drains every
67//! still-buffered event before exiting; pending events MUST land in
68//! `signed_events` before the daemon's tokio runtime is torn down,
69//! or the chain-log property is broken.
70
71use std::path::{Path, PathBuf};
72use std::sync::Arc;
73use std::sync::atomic::{AtomicU64, Ordering};
74
75use anyhow::{Context, Result};
76use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
77use tokio::task::JoinHandle;
78
79use crate::governance::agent_action::{AgentAction, Decision};
80use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
81
82/// Cluster-C SEC-3 (issue #767) — maximum number of times a sink will
83/// retry a single audit append after a `SQLITE_CONSTRAINT_UNIQUE` race
84/// (concurrent writer beat us to the same `sequence` value on the
85/// `idx_signed_events_sequence` UNIQUE index). The BEGIN IMMEDIATE wrap
86/// in `append_signed_event` should make true contention rare; this
87/// retry budget is the safety belt. After exhausting it the event
88/// lands in `signed_events_dlq` so the audit drop is never silent.
89const APPEND_UNIQUE_RACE_MAX_RETRIES: usize = 5;
90
91/// Wire-name for the deferred refusal audit row. Audit-side dashboards
92/// filter on this string to surface storage-hook refusals separate
93/// from the existing `governance.check` rows produced by the audited
94/// `check_agent_action` path.
95pub const GOVERNANCE_REFUSAL_EVENT_TYPE: &str = "governance.refusal";
96
97/// One refusal captured by the storage hook, awaiting flush to the
98/// `signed_events` chain.
99///
100/// All fields are owned (no borrows) so the event can cross the mpsc
101/// channel boundary without lifetime gymnastics. `payload_bytes` is
102/// the canonical-JSON encoding of `{action, decision}` — the same
103/// shape the audited path commits to via
104/// `agent_action::emit_check_event`. The drainer hashes this on the
105/// way to `signed_events.payload_hash`.
106#[derive(Debug, Clone)]
107pub struct DeferredAuditEvent {
108 /// Agent identity at the moment of refusal (resolved from request
109 /// or process context). Lands in `signed_events.agent_id`.
110 pub agent_id: String,
111 /// The action that was refused. Cloned from the hook input.
112 pub action: AgentAction,
113 /// The verdict — must be a `Refuse` variant; non-refusal events
114 /// do not enter the queue (the submit helpers gate on
115 /// `Decision::is_refusal`).
116 pub decision: Decision,
117 /// Wall-clock timestamp of refusal. Lands in
118 /// `signed_events.timestamp` as RFC3339.
119 pub timestamp: chrono::DateTime<chrono::Utc>,
120}
121
122impl DeferredAuditEvent {
123 /// Build a deferred event from the hook's three inputs. Returns
124 /// `None` when `decision` is not a refusal — callers should
125 /// only submit refusals to the queue (Allow / Warn paths do not
126 /// chain-log a refusal row).
127 #[must_use]
128 pub fn from_refusal(agent_id: &str, action: &AgentAction, decision: &Decision) -> Option<Self> {
129 if !decision.is_refusal() {
130 return None;
131 }
132 Some(Self {
133 agent_id: agent_id.to_string(),
134 action: action.clone(),
135 decision: decision.clone(),
136 timestamp: chrono::Utc::now(),
137 })
138 }
139
140 /// Extract the rule_id from the refusal verdict. Used by the
141 /// drainer to surface the firing rule in the audit row's
142 /// canonical payload.
143 #[must_use]
144 pub fn rule_id(&self) -> Option<&str> {
145 match &self.decision {
146 Decision::Refuse { rule_id, .. } => Some(rule_id.as_str()),
147 _ => None,
148 }
149 }
150
151 /// Extract the refusal reason from the verdict (verbatim
152 /// operator-authored string).
153 #[must_use]
154 pub fn reason(&self) -> Option<&str> {
155 match &self.decision {
156 Decision::Refuse { reason, .. } => Some(reason.as_str()),
157 _ => None,
158 }
159 }
160
161 /// Canonical JSON shape the drainer hashes for
162 /// `signed_events.payload_hash`. Stable across versions: a
163 /// flat object with `action`, `decision`, `agent_id`,
164 /// `timestamp` keys — same outline as
165 /// `agent_action::emit_check_event` plus the agent + timestamp.
166 ///
167 /// # Errors
168 ///
169 /// Returns an error only if `serde_json` cannot serialize the
170 /// action variant (in practice never happens for the canonical
171 /// AgentAction shapes).
172 pub fn canonical_bytes(&self) -> Result<Vec<u8>> {
173 let canonical = serde_json::json!({
174 "action": self.action,
175 "decision": self.decision,
176 "agent_id": self.agent_id,
177 "timestamp": self.timestamp.to_rfc3339(),
178 });
179 serde_json::to_vec(&canonical).context("DeferredAuditEvent::canonical_bytes")
180 }
181}
182
183/// Shared counters surfaced for observability. Cloning is cheap
184/// (just `Arc` bumps); the public read path is on the queue handle.
185#[derive(Debug, Clone, Default)]
186pub struct DeferredAuditMetrics {
187 /// Number of events submitted into the queue. Includes events
188 /// that were later dropped because the receiver was already
189 /// closed.
190 pub submitted: Arc<AtomicU64>,
191 /// Number of events the drainer successfully appended to
192 /// `signed_events`.
193 pub appended: Arc<AtomicU64>,
194 /// Number of submit attempts that failed because the receiver
195 /// was already closed (drainer dropped / shutdown raced).
196 pub send_failures: Arc<AtomicU64>,
197 /// Number of drainer iterations that failed the SQLite append.
198 /// A non-zero value indicates DB pressure / corruption; the
199 /// supervisor surfaces these in tracing::error logs.
200 pub append_failures: Arc<AtomicU64>,
201 /// Number of times the supervisor restarted the drainer after a
202 /// panic. Should be zero in healthy operation.
203 pub drainer_panics: Arc<AtomicU64>,
204 /// Cluster-C SEC-3 (issue #767) — number of events that landed in
205 /// the `signed_events_dlq` table after exhausting the
206 /// `SQLITE_CONSTRAINT_UNIQUE` retry budget or hitting an
207 /// unrecoverable non-race error. Surfaced via the capabilities-v3
208 /// envelope's `approval.deferred_audit_dlq_size` field (live count
209 /// query) and via this counter (cumulative since process boot).
210 pub dlq_landed: Arc<AtomicU64>,
211 /// Cluster-C SEC-3 (issue #767) — cumulative number of
212 /// `SQLITE_CONSTRAINT_UNIQUE` race retries observed by the
213 /// production sink. Every retry indicates the chain-head read
214 /// raced a sibling-connection writer; a sustained non-zero value
215 /// hints at write-contention pressure on the audit chain (e.g. a
216 /// burst of refusals while the substrate writer is also churning
217 /// out `memory_link.created` rows).
218 pub unique_race_retries: Arc<AtomicU64>,
219}
220
221impl DeferredAuditMetrics {
222 /// Number of events submitted since process boot.
223 #[must_use]
224 pub fn submitted_count(&self) -> u64 {
225 self.submitted.load(Ordering::Relaxed)
226 }
227
228 /// Number of events successfully chain-logged since process boot.
229 #[must_use]
230 pub fn appended_count(&self) -> u64 {
231 self.appended.load(Ordering::Relaxed)
232 }
233
234 /// Number of submit failures (receiver dropped).
235 #[must_use]
236 pub fn send_failure_count(&self) -> u64 {
237 self.send_failures.load(Ordering::Relaxed)
238 }
239
240 /// Number of append failures (SQLite error).
241 #[must_use]
242 pub fn append_failure_count(&self) -> u64 {
243 self.append_failures.load(Ordering::Relaxed)
244 }
245
246 /// Number of supervisor-observed drainer panics.
247 #[must_use]
248 pub fn panic_count(&self) -> u64 {
249 self.drainer_panics.load(Ordering::Relaxed)
250 }
251
252 /// Cluster-C SEC-3 — cumulative number of events that landed in
253 /// `signed_events_dlq` since process boot.
254 #[must_use]
255 pub fn dlq_landed_count(&self) -> u64 {
256 self.dlq_landed.load(Ordering::Relaxed)
257 }
258
259 /// Cluster-C SEC-3 — cumulative number of UNIQUE-constraint race
260 /// retries observed by the production sink since process boot.
261 #[must_use]
262 pub fn unique_race_retry_count(&self) -> u64 {
263 self.unique_race_retries.load(Ordering::Relaxed)
264 }
265}
266
267/// Producer-side handle. Cloneable so multiple callsites (HTTP
268/// handler, MCP handler, internal substrate writer) all share one
269/// queue.
270#[derive(Clone)]
271pub struct DeferredAuditQueue {
272 sender: UnboundedSender<DeferredAuditEvent>,
273 metrics: DeferredAuditMetrics,
274}
275
276impl DeferredAuditQueue {
277 /// Create a fresh queue + uninstalled receiver. The receiver
278 /// MUST be passed to [`spawn_drainer_task`] (or
279 /// [`spawn_supervised_drainer`]) for events to land — submits
280 /// against an unspawned receiver accumulate in the channel
281 /// buffer indefinitely until the receiver is consumed or
282 /// dropped.
283 #[must_use]
284 pub fn new() -> (Self, UnboundedReceiver<DeferredAuditEvent>) {
285 let (sender, receiver) = mpsc::unbounded_channel();
286 let queue = Self {
287 sender,
288 metrics: DeferredAuditMetrics::default(),
289 };
290 (queue, receiver)
291 }
292
293 /// Submit a refusal event. Non-blocking. Never panics — if the
294 /// receiver is closed the metric counter is bumped and a
295 /// tracing::warn is emitted, but the caller path is unaffected.
296 /// Returns `true` when the event was queued, `false` when the
297 /// receiver was already closed.
298 pub fn submit(&self, event: DeferredAuditEvent) -> bool {
299 self.metrics.submitted.fetch_add(1, Ordering::Relaxed);
300 match self.sender.send(event) {
301 Ok(()) => true,
302 Err(_) => {
303 self.metrics.send_failures.fetch_add(1, Ordering::Relaxed);
304 tracing::warn!(
305 "deferred_audit_queue: submit failed (drainer receiver closed); \
306 audit chain row LOST for this refusal"
307 );
308 false
309 }
310 }
311 }
312
313 /// Convenience: build + submit a refusal from the three hook
314 /// inputs. Returns `true` when an event was actually enqueued
315 /// (i.e. the verdict was a refusal AND the receiver was open).
316 pub fn submit_refusal(
317 &self,
318 agent_id: &str,
319 action: &AgentAction,
320 decision: &Decision,
321 ) -> bool {
322 let Some(event) = DeferredAuditEvent::from_refusal(agent_id, action, decision) else {
323 return false;
324 };
325 self.submit(event)
326 }
327
328 /// Observability handle. Clone-cheap; safe to expose to readers
329 /// (Prometheus scrape, MCP `governance_state` tool, etc.).
330 #[must_use]
331 pub fn metrics(&self) -> DeferredAuditMetrics {
332 self.metrics.clone()
333 }
334
335 /// True when the drainer receiver is still attached. False when
336 /// the supervisor task and its receiver have both terminated
337 /// (shutdown complete).
338 #[must_use]
339 pub fn is_open(&self) -> bool {
340 !self.sender.is_closed()
341 }
342}
343
344// ---------------------------------------------------------------------------
345// Drainer / supervisor
346// ---------------------------------------------------------------------------
347
348/// Outcome of one drainer-side append attempt.
349///
350/// Cluster-C SEC-3 (issue #767) — splits a sink's per-event verdict
351/// into three buckets so the drainer's metrics + DLQ accounting line
352/// up with the operator-facing semantics:
353///
354/// * [`AppendOutcome::Appended`] — row landed in `signed_events`,
355/// chain advanced one step. Increments `appended`.
356/// * [`AppendOutcome::DlqLanded`] — append exhausted its race-retry
357/// budget or hit an unrecoverable non-race error; the sink wrote
358/// the event into `signed_events_dlq` with the failure reason
359/// captured. Increments `dlq_landed`. The audit chain itself does
360/// NOT advance, but the row is recoverable post-mortem.
361/// * Returning `Err(_)` from `append` means the sink could not even
362/// land in the DLQ (e.g. DB file gone, schema mismatch). Increments
363/// `append_failures` — the chain-log property has truly broken for
364/// this event and an operator must intervene.
365#[derive(Debug, Clone, Copy, PartialEq, Eq)]
366pub enum AppendOutcome {
367 /// Event landed in `signed_events`.
368 Appended,
369 /// Event landed in `signed_events_dlq` after exhausting retries
370 /// or hitting an unrecoverable non-race error.
371 DlqLanded,
372}
373
374/// Sink trait: an abstraction over "where the drainer writes the
375/// audit row". The production wiring opens a fresh SQLite
376/// `Connection` per drainer task and writes through `signed_events`
377/// with DLQ fallback; tests substitute a mock sink to assert
378/// per-event behavior or inject panics for supervisor-recovery
379/// coverage.
380///
381/// `append` MUST be `&mut` so a test sink can record events into an
382/// owned `Vec` without interior mutability. Production sinks
383/// (SQLite-backed) hold their state behind the impl.
384pub trait DeferredAuditSink: Send + 'static {
385 /// Persist one event.
386 ///
387 /// # Errors
388 ///
389 /// Returns `Err` only when the sink cannot even land the event in
390 /// its DLQ surface — a genuinely unrecoverable case (DB file gone,
391 /// schema mismatch). Soft failures (race retries, unrecoverable
392 /// `signed_events` INSERT followed by successful DLQ land) are
393 /// reported via [`AppendOutcome::DlqLanded`].
394 fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome>;
395}
396
397/// Production sink: opens a fresh SQLite `Connection` against the
398/// daemon's database path and appends a `governance.refusal` row to
399/// `signed_events` for each event.
400///
401/// One `Connection` per drainer task — NOT shared with the substrate
402/// writer. SQLite WAL mode lets the drainer's appends proceed in
403/// parallel with the writer's INSERTs without lock contention.
404///
405/// # Cluster-C SEC-3 (issue #767) — race-retry + DLQ
406///
407/// The sink wraps `append_signed_event` in a bounded retry loop. A
408/// `SQLITE_CONSTRAINT_UNIQUE` failure on the `idx_signed_events_sequence`
409/// UNIQUE index is the recoverable race-only signal (two writers
410/// computed the same `next_seq` simultaneously); the sink re-runs
411/// `append_signed_event` (which re-reads the chain head) up to
412/// [`APPEND_UNIQUE_RACE_MAX_RETRIES`] times. Any other rusqlite
413/// error, or a retry budget exhaustion, lands the event in
414/// `signed_events_dlq` and returns `Ok(AppendOutcome::DlqLanded)` so
415/// the drainer can update its counters without crashing.
416pub struct SqliteSignedEventsSink {
417 db_path: PathBuf,
418 conn: Option<rusqlite::Connection>,
419 metrics: Option<DeferredAuditMetrics>,
420}
421
422impl SqliteSignedEventsSink {
423 /// Construct without opening — the connection is opened lazily
424 /// on first `append`. This pattern lets the supervisor restart
425 /// the sink across drainer respawns without holding a closed
426 /// `Connection` handle.
427 ///
428 /// Built without a metrics handle; race-retry + DLQ-land events
429 /// will not increment any external counters. Use
430 /// [`Self::with_metrics`] (or [`spawn_supervised_drainer`] which
431 /// threads the metrics handle automatically) for full
432 /// observability.
433 #[must_use]
434 pub fn new(db_path: impl Into<PathBuf>) -> Self {
435 Self {
436 db_path: db_path.into(),
437 conn: None,
438 metrics: None,
439 }
440 }
441
442 /// Construct a sink wired to a metrics handle. The handle's
443 /// `dlq_landed` and `unique_race_retries` counters are bumped as
444 /// the sink observes those outcomes.
445 #[must_use]
446 pub fn with_metrics(db_path: impl Into<PathBuf>, metrics: DeferredAuditMetrics) -> Self {
447 Self {
448 db_path: db_path.into(),
449 conn: None,
450 metrics: Some(metrics),
451 }
452 }
453
454 fn ensure_conn(&mut self) -> Result<&rusqlite::Connection> {
455 if self.conn.is_none() {
456 let conn = crate::db::open(&self.db_path).with_context(|| {
457 format!(
458 "SqliteSignedEventsSink: open {} for deferred-audit drainer",
459 self.db_path.display()
460 )
461 })?;
462 self.conn = Some(conn);
463 }
464 // We just inserted Some — unwrap is safe.
465 Ok(self.conn.as_ref().expect("conn populated above"))
466 }
467
468 fn bump_dlq(&self) {
469 if let Some(m) = &self.metrics {
470 m.dlq_landed.fetch_add(1, Ordering::Relaxed);
471 }
472 }
473
474 fn bump_race_retry(&self) {
475 if let Some(m) = &self.metrics {
476 m.unique_race_retries.fetch_add(1, Ordering::Relaxed);
477 }
478 }
479}
480
481impl DeferredAuditSink for SqliteSignedEventsSink {
482 fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
483 let bytes = event
484 .canonical_bytes()
485 .context("SqliteSignedEventsSink: canonical_bytes")?;
486 let hash = payload_hash(&bytes);
487 // v0.7.0 #1035 — sign the payload_hash with the daemon's
488 // process-wide audit key when installed. The forensic JSONL
489 // sink and this SQL sink share the same key (installed by
490 // `audit::init`); a downstream auditor with the daemon's
491 // verifying key validates both chains uniformly.
492 let (signature, attest_level) =
493 match crate::governance::audit::try_sign_audit_payload(&hash) {
494 Some((sig, level)) => (Some(sig), level.to_string()),
495 None => (
496 None,
497 crate::models::AttestLevel::Unsigned.as_str().to_string(),
498 ),
499 };
500 let signed = SignedEvent {
501 id: uuid::Uuid::new_v4().to_string(),
502 agent_id: event.agent_id.clone(),
503 event_type: GOVERNANCE_REFUSAL_EVENT_TYPE.to_string(),
504 payload_hash: hash,
505 signature,
506 attest_level,
507 timestamp: event.timestamp.to_rfc3339(),
508 ..SignedEvent::default()
509 };
510
511 // Race-retry loop: SQLITE_CONSTRAINT_UNIQUE on the
512 // `idx_signed_events_sequence` index is the race-only failure
513 // mode. Every other rusqlite error path bails to DLQ.
514 let conn_path = self.db_path.clone();
515 let mut last_err: Option<anyhow::Error> = None;
516 for attempt in 0..=APPEND_UNIQUE_RACE_MAX_RETRIES {
517 let conn = self.ensure_conn()?;
518 match append_signed_event(conn, &signed) {
519 Ok(()) => return Ok(AppendOutcome::Appended),
520 Err(e) => {
521 if is_unique_constraint_race(&e) && attempt < APPEND_UNIQUE_RACE_MAX_RETRIES {
522 self.bump_race_retry();
523 tracing::warn!(
524 attempt = attempt + 1,
525 db = %conn_path.display(),
526 "deferred_audit sink: SQLITE_CONSTRAINT_UNIQUE on signed_events.sequence — \
527 chain-head race; retrying (budget {APPEND_UNIQUE_RACE_MAX_RETRIES})"
528 );
529 last_err = Some(e);
530 continue;
531 }
532 last_err = Some(e);
533 break;
534 }
535 }
536 }
537
538 // Either the retry budget was exhausted on UNIQUE races OR
539 // the first attempt produced a non-race error. Land in DLQ.
540 //
541 // v0.7.0 #1046 (Agent-6 #7) — chain-log property advisory.
542 // At v0.7.0 the audit-chain delivery contract is:
543 //
544 // **chain-log emission is "exactly-once OR DLQ-recoverable"**
545 //
546 // Specifically:
547 // - On the happy path, every refusal lands exactly one
548 // `governance.refusal` row in `signed_events` (the
549 // append-only chain advances by one).
550 // - On the DLQ-landing path (this branch), the refusal
551 // row does NOT advance the chain — instead a row lands
552 // in `signed_events_dlq` with the same `id`,
553 // `agent_id`, `payload_hash`, and `failure_reason`.
554 // The DLQ row preserves enough state to replay back
555 // into the chain.
556 // - There is NO boot-time DLQ→chain replay path at
557 // v0.7.0. Operators with non-zero `dlq_landed_count`
558 // should run the operator-side replay tooling (or
559 // manual SQL: copy DLQ rows into `signed_events` after
560 // resolving the chain-head race condition that caused
561 // the DLQ landing).
562 //
563 // A future v0.8 change can wire a boot-time DLQ replay
564 // sweep that retries every `signed_events_dlq` entry into
565 // `signed_events` via `append_signed_event`. The current
566 // chain-log property is the load-bearing contract: the
567 // cryptographic chain never carries a "phantom" refusal
568 // (every chain row is a real append), and DLQ rows are
569 // recoverable via operator action.
570 let err = last_err.unwrap_or_else(|| anyhow::anyhow!("unknown drainer sink error"));
571 let failure_reason = format!("{err:#}");
572 let conn = self.ensure_conn()?;
573 conn.execute(
574 "INSERT INTO signed_events_dlq \
575 (id, agent_id, event_type, payload_hash, signature, attest_level, \
576 timestamp, failure_reason, failed_at) \
577 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
578 rusqlite::params![
579 signed.id,
580 signed.agent_id,
581 signed.event_type,
582 signed.payload_hash,
583 signed.signature,
584 signed.attest_level,
585 signed.timestamp,
586 failure_reason,
587 chrono::Utc::now().to_rfc3339(),
588 ],
589 )
590 .context("SqliteSignedEventsSink: DLQ insert")?;
591 tracing::error!(
592 failure_reason = %failure_reason,
593 agent_id = %signed.agent_id,
594 event_id = %signed.id,
595 "deferred_audit sink: append exhausted retries or hit non-race error — \
596 event landed in signed_events_dlq (chain row NOT advanced; replay needed)"
597 );
598 self.bump_dlq();
599 Ok(AppendOutcome::DlqLanded)
600 }
601}
602
603/// Cluster-C SEC-3 (issue #767) — classify an `anyhow::Error` produced
604/// by `append_signed_event` as a recoverable `SQLITE_CONSTRAINT_UNIQUE`
605/// chain-head race vs everything else.
606///
607/// The classification walks the chain of causes and matches on
608/// `rusqlite::Error::SqliteFailure` with extended code
609/// `SQLITE_CONSTRAINT_UNIQUE` (2067). We deliberately use the typed
610/// matcher rather than string-matching the rendered error so a future
611/// rusqlite version that tweaks the `Display` impl doesn't silently
612/// flip every race into a DLQ-land.
613fn is_unique_constraint_race(err: &anyhow::Error) -> bool {
614 for cause in err.chain() {
615 if let Some(rusqlite::Error::SqliteFailure(code, _)) =
616 cause.downcast_ref::<rusqlite::Error>()
617 {
618 if code.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE {
619 return true;
620 }
621 }
622 }
623 false
624}
625
626/// Cluster-C SEC-3 (issue #767) — live count of rows in
627/// `signed_events_dlq`.
628///
629/// Surfaced via the capabilities-v3 envelope's
630/// `approval.deferred_audit_dlq_size` field so operator dashboards
631/// see the current DLQ depth without scraping logs. Returns `0` on
632/// query failure (a missing table or transient lock); callers that
633/// want hard-fail behavior should query the table directly.
634///
635/// # Errors
636///
637/// Returns the underlying `rusqlite` error if the SELECT fails for a
638/// reason other than `SQLITE_NOMEM` (which we treat as transient).
639/// The capabilities pathway treats this as best-effort and falls
640/// through to 0 on error.
641pub fn dlq_size(conn: &rusqlite::Connection) -> Result<u64> {
642 let count: i64 = conn
643 .query_row("SELECT COUNT(*) FROM signed_events_dlq", [], |r| r.get(0))
644 .context("dlq_size: SELECT COUNT")?;
645 Ok(u64::try_from(count).unwrap_or(0))
646}
647
648/// Spawn a single drainer iteration. The returned `JoinHandle`
649/// completes (Ok) when the channel sender is dropped AND the
650/// receiver has been fully drained — graceful shutdown. A panic in
651/// the sink propagates through the `JoinHandle` (the
652/// [`spawn_supervised_drainer`] wrapper catches it and respawns).
653///
654/// Use [`spawn_supervised_drainer`] in production. This bare entry
655/// point is exposed for tests that want one-shot drainer behavior
656/// without supervisor restart.
657#[must_use]
658pub fn spawn_drainer_task<S: DeferredAuditSink + 'static>(
659 mut receiver: UnboundedReceiver<DeferredAuditEvent>,
660 mut sink: S,
661 metrics: DeferredAuditMetrics,
662) -> JoinHandle<UnboundedReceiver<DeferredAuditEvent>> {
663 tokio::spawn(async move {
664 while let Some(event) = receiver.recv().await {
665 match sink.append(&event) {
666 Ok(AppendOutcome::Appended) => {
667 metrics.appended.fetch_add(1, Ordering::Relaxed);
668 }
669 Ok(AppendOutcome::DlqLanded) => {
670 // Cluster-C SEC-3: the sink already captured the
671 // event in `signed_events_dlq` and bumped its
672 // own dlq_landed metric (if wired). We also bump
673 // `append_failures` so existing dashboards that
674 // alert on a non-zero append_failures count
675 // still surface DLQ landings — operators read
676 // the SAME signal regardless of which bucket
677 // the row went into.
678 metrics.append_failures.fetch_add(1, Ordering::Relaxed);
679 tracing::warn!(
680 "deferred_audit drainer: event landed in DLQ \
681 (audit chain row NOT advanced; operator replay needed)"
682 );
683 }
684 Err(e) => {
685 metrics.append_failures.fetch_add(1, Ordering::Relaxed);
686 tracing::error!(
687 "deferred_audit drainer: sink.append failed (no DLQ landing either): {:#}",
688 e
689 );
690 // We don't requeue — the channel is single-consumer.
691 // The supervisor's panic-restart path is for sink
692 // PANICS (poisoned state); soft errors here are
693 // recorded and the loop continues.
694 }
695 }
696 }
697 // Sender dropped + channel drained → graceful shutdown. Return
698 // the receiver so the supervisor (which owns the sender
699 // ultimately via the queue handle) can drop it cleanly.
700 receiver
701 })
702}
703
704/// Supervisor: spawns the drainer with panic recovery. Any panic
705/// caught at the `JoinHandle::is_panic()` boundary triggers a
706/// respawn with a FRESH sink (`make_sink()`), preserving the
707/// receiver and the metrics handle.
708///
709/// The supervisor task returns when either:
710/// - The channel sender is dropped and the channel is fully
711/// drained (graceful shutdown).
712/// - `max_restarts` consecutive panics occur (default `u32::MAX`
713/// — effectively never gives up; an operator that wants to
714/// fail loudly on persistent panics can configure a finite
715/// limit).
716///
717/// The returned `JoinHandle` resolves when the supervisor exits.
718#[must_use]
719pub fn spawn_supervised_drainer<F, S>(
720 receiver: UnboundedReceiver<DeferredAuditEvent>,
721 make_sink: F,
722 metrics: DeferredAuditMetrics,
723 max_restarts: u32,
724) -> JoinHandle<()>
725where
726 F: Fn() -> S + Send + 'static,
727 S: DeferredAuditSink + 'static,
728{
729 tokio::spawn(async move {
730 // Drainer iteration. The receiver lives inside the spawned
731 // task; on graceful shutdown the task returns it back to
732 // us. On panic the receiver is lost — see the
733 // documentation block for the supervisor restart pattern.
734 let sink = make_sink();
735 let handle = spawn_drainer_task(receiver, sink, metrics.clone());
736 match handle.await {
737 Ok(returned_receiver) => {
738 // Drainer exited gracefully — sender dropped + drained.
739 drop(returned_receiver);
740 }
741 Err(join_err) if join_err.is_panic() => {
742 metrics.drainer_panics.fetch_add(1, Ordering::Relaxed);
743 tracing::error!(
744 "deferred_audit supervisor: drainer task panicked ({join_err}); \
745 max_restarts={max_restarts} — receiver moved into the panicked \
746 task and cannot be recovered; future refusals submitted to the \
747 existing queue will fail to land. Operator action required: \
748 rebuild the daemon's deferred-audit queue (or restart the daemon) \
749 to restore the audit-chain property."
750 );
751 // We cannot loop without a valid receiver. The
752 // max_restarts variable is preserved in the API so
753 // future revisions can introduce a buffering scheme
754 // that lets the supervisor recover the in-flight
755 // events; today the contract is "panic in drainer
756 // = chain loss on the unflushed buffer, future
757 // events recorded as send_failures".
758 let _ = max_restarts;
759 }
760 Err(join_err) => {
761 // Cancellation (task aborted) — treat as shutdown.
762 tracing::warn!(
763 "deferred_audit supervisor: drainer aborted ({join_err}); \
764 pending events may be lost"
765 );
766 }
767 }
768 })
769}
770
771/// Close the queue and wait for the supervisor task to drain every
772/// pending event. After this returns the chain-log property is
773/// "every refusal submitted before close lands in `signed_events`."
774///
775/// # Errors
776///
777/// Returns the `tokio::task::JoinError` if the supervisor task
778/// panicked while draining (rare — the supervisor catches drainer
779/// panics, but its own panic would surface here).
780pub async fn close_and_flush(
781 queue: DeferredAuditQueue,
782 supervisor: JoinHandle<()>,
783) -> std::result::Result<(), tokio::task::JoinError> {
784 // Drop the producer-side sender — once every clone is dropped,
785 // the receiver's `recv().await` returns None and the drainer
786 // exits gracefully.
787 drop(queue);
788 supervisor.await
789}
790
791/// Default bounded wait for [`drain_pending`] — how long the daemon's
792/// shutdown path waits for the drainer to flush every submitted refusal
793/// into `signed_events` before giving up and proceeding to WAL checkpoint
794/// + exit. Five seconds comfortably covers a multi-thousand-event backlog
795/// at the sink's ~25-100 microsecond-per-append rate while still bounding
796/// shutdown latency if the drainer is genuinely wedged.
797pub const DEFAULT_SHUTDOWN_DRAIN_TIMEOUT: std::time::Duration =
798 std::time::Duration::from_secs(SHUTDOWN_DRAIN_TIMEOUT_SECS);
799
800/// Whole-seconds component of [`DEFAULT_SHUTDOWN_DRAIN_TIMEOUT`]. Named so
801/// the magnitude is not an inline literal inside the `Duration` const.
802const SHUTDOWN_DRAIN_TIMEOUT_SECS: u64 = 5;
803
804/// Poll cadence for [`drain_pending`]. The drainer advances its atomic
805/// counters from another task, so the shutdown path samples them on this
806/// interval rather than busy-spinning.
807const DRAIN_POLL_INTERVAL: std::time::Duration =
808 std::time::Duration::from_millis(DRAIN_POLL_INTERVAL_MILLIS);
809
810/// Whole-milliseconds component of [`DRAIN_POLL_INTERVAL`].
811const DRAIN_POLL_INTERVAL_MILLIS: u64 = 10;
812
813/// Wait (bounded) until every event submitted to the queue has been
814/// accounted for by the drainer — appended to `signed_events`, landed in
815/// the DLQ / counted as an append failure, or recorded as a send failure.
816///
817/// This is the daemon shutdown-path counterpart to [`close_and_flush`].
818/// Unlike `close_and_flush`, it does NOT require the producer-side senders
819/// to be dropped: the daemon installs the queue's sender clones inside
820/// process-wide `OnceLock` governance hooks
821/// (`storage::GOVERNANCE_PRE_WRITE`, `wire_check::GOVERNANCE_PRE_ACTION`)
822/// that live for the entire process lifetime, so the channel never closes
823/// and the drainer's `recv().await` never returns `None`. Awaiting the
824/// supervisor would therefore block forever. Instead we wait for the
825/// drainer to catch up to the submitted count by polling the shared atomic
826/// metrics.
827///
828/// MUST be called only after every write path that can submit a refusal
829/// has quiesced (i.e. after the HTTP server's graceful-shutdown future has
830/// resolved). At that point `submitted` is final, so the loop terminates
831/// as soon as the drainer finishes the backlog.
832///
833/// Returns `true` when the queue fully drained within `timeout`, `false`
834/// when the timeout elapsed with events still in flight (the caller should
835/// log the residual so the audit gap is visible to operators).
836pub async fn drain_pending(metrics: &DeferredAuditMetrics, timeout: std::time::Duration) -> bool {
837 let deadline = tokio::time::Instant::now() + timeout;
838 loop {
839 if drain_accounted(metrics) >= metrics.submitted_count() {
840 return true;
841 }
842 if tokio::time::Instant::now() >= deadline {
843 return false;
844 }
845 tokio::time::sleep(DRAIN_POLL_INTERVAL).await;
846 }
847}
848
849/// Number of submitted events the drainer has finished with — every
850/// received event bumps exactly one of `appended` / `append_failures`
851/// (the latter also covers DLQ landings), and a closed receiver bumps
852/// `send_failures`. The sum equalling `submitted` means the backlog is
853/// fully processed.
854#[must_use]
855fn drain_accounted(metrics: &DeferredAuditMetrics) -> u64 {
856 metrics
857 .appended_count()
858 .saturating_add(metrics.append_failure_count())
859 .saturating_add(metrics.send_failure_count())
860}
861
862// ---------------------------------------------------------------------------
863// Convenience installer for the daemon path
864// ---------------------------------------------------------------------------
865
866/// Build a queue + spawn a supervised drainer in one call. Returns
867/// the producer handle and the supervisor `JoinHandle` — the daemon
868/// stashes the queue on `AppState` and the join handle in
869/// `task_handles` so `serve` aborts it on shutdown.
870///
871/// The drainer opens a FRESH `Connection` per its sink (via
872/// `SqliteSignedEventsSink::new(db_path)`); on respawn after panic
873/// the sink is rebuilt verbatim. No connection is shared with the
874/// substrate writer.
875#[must_use]
876pub fn install_deferred_audit_drainer(db_path: &Path) -> (DeferredAuditQueue, JoinHandle<()>) {
877 let (queue, receiver) = DeferredAuditQueue::new();
878 let metrics = queue.metrics();
879 let db_path_buf = db_path.to_path_buf();
880 let metrics_for_factory = metrics.clone();
881 let supervisor = spawn_supervised_drainer(
882 receiver,
883 move || {
884 SqliteSignedEventsSink::with_metrics(db_path_buf.clone(), metrics_for_factory.clone())
885 },
886 metrics,
887 u32::MAX,
888 );
889 (queue, supervisor)
890}
891
892// ---------------------------------------------------------------------------
893// Tests
894// ---------------------------------------------------------------------------
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use std::sync::Mutex;
900
901 // -----------------------------------------------------------------
902 // DeferredAuditEvent shape + canonical bytes
903 // -----------------------------------------------------------------
904
905 fn refusal_action() -> AgentAction {
906 AgentAction::Custom {
907 custom_kind: "memory_write".to_string(),
908 payload: serde_json::json!({"namespace": "secrets/api"}),
909 }
910 }
911
912 fn refusal_decision() -> Decision {
913 Decision::Refuse {
914 rule_id: "R001".to_string(),
915 reason: "no writes to secrets/*".to_string(),
916 }
917 }
918
919 #[test]
920 fn from_refusal_returns_some_for_refuse() {
921 let event =
922 DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &refusal_decision())
923 .expect("must be Some for Refuse verdict");
924 assert_eq!(event.agent_id, "agent:alice");
925 assert_eq!(event.rule_id(), Some("R001"));
926 assert_eq!(event.reason(), Some("no writes to secrets/*"));
927 }
928
929 #[test]
930 fn from_refusal_returns_none_for_allow() {
931 let event =
932 DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &Decision::Allow);
933 assert!(event.is_none(), "Allow verdict must not enqueue an event");
934 }
935
936 #[test]
937 fn from_refusal_returns_none_for_warn() {
938 let warn = Decision::Warn {
939 rule_id: "W001".to_string(),
940 reason: "warning only".to_string(),
941 };
942 let event = DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &warn);
943 assert!(event.is_none(), "Warn verdict must not enqueue a refusal");
944 }
945
946 #[test]
947 fn canonical_bytes_includes_rule_and_action() {
948 let event =
949 DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &refusal_decision())
950 .unwrap();
951 let bytes = event.canonical_bytes().unwrap();
952 let s = std::str::from_utf8(&bytes).unwrap();
953 assert!(s.contains("R001"), "canonical payload must include rule_id");
954 assert!(
955 s.contains("memory_write"),
956 "canonical payload must include action kind"
957 );
958 assert!(
959 s.contains("agent:alice"),
960 "canonical payload must include agent id"
961 );
962 }
963
964 #[test]
965 fn rule_id_returns_none_for_non_refusal() {
966 let event = DeferredAuditEvent {
967 agent_id: "x".into(),
968 action: refusal_action(),
969 decision: Decision::Allow,
970 timestamp: chrono::Utc::now(),
971 };
972 assert!(event.rule_id().is_none());
973 assert!(event.reason().is_none());
974 }
975
976 // -----------------------------------------------------------------
977 // DeferredAuditQueue submit + non-blocking semantics
978 // -----------------------------------------------------------------
979
980 #[tokio::test]
981 async fn queue_new_returns_open_handle() {
982 let (queue, _rx) = DeferredAuditQueue::new();
983 assert!(queue.is_open());
984 assert_eq!(queue.metrics().submitted_count(), 0);
985 }
986
987 #[tokio::test]
988 async fn submit_with_receiver_attached_succeeds() {
989 let (queue, mut rx) = DeferredAuditQueue::new();
990 let event =
991 DeferredAuditEvent::from_refusal("agent:t", &refusal_action(), &refusal_decision())
992 .unwrap();
993 assert!(queue.submit(event.clone()));
994 assert_eq!(queue.metrics().submitted_count(), 1);
995 let received = rx.recv().await.unwrap();
996 assert_eq!(received.agent_id, event.agent_id);
997 assert_eq!(received.rule_id(), Some("R001"));
998 }
999
1000 #[tokio::test]
1001 async fn submit_after_receiver_dropped_records_send_failure() {
1002 let (queue, rx) = DeferredAuditQueue::new();
1003 drop(rx);
1004 // sender knows its peer is gone
1005 assert!(!queue.is_open());
1006 let event =
1007 DeferredAuditEvent::from_refusal("agent:t", &refusal_action(), &refusal_decision())
1008 .unwrap();
1009 let ok = queue.submit(event);
1010 assert!(!ok, "submit must return false when receiver is closed");
1011 assert_eq!(queue.metrics().submitted_count(), 1);
1012 assert_eq!(queue.metrics().send_failure_count(), 1);
1013 }
1014
1015 #[tokio::test]
1016 async fn submit_refusal_helper_skips_non_refusals() {
1017 let (queue, mut rx) = DeferredAuditQueue::new();
1018 // Allow does NOT enqueue
1019 let enq = queue.submit_refusal("agent:t", &refusal_action(), &Decision::Allow);
1020 assert!(!enq);
1021 // Try receiving — should timeout (channel empty)
1022 let recv = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1023 assert!(recv.is_err(), "no event should have been enqueued");
1024 // Refusal DOES enqueue
1025 let enq2 = queue.submit_refusal("agent:t", &refusal_action(), &refusal_decision());
1026 assert!(enq2);
1027 let event = rx.recv().await.unwrap();
1028 assert_eq!(event.agent_id, "agent:t");
1029 }
1030
1031 #[tokio::test]
1032 async fn queue_clone_shares_underlying_channel() {
1033 let (queue, mut rx) = DeferredAuditQueue::new();
1034 let clone = queue.clone();
1035 let event1 =
1036 DeferredAuditEvent::from_refusal("agent:a", &refusal_action(), &refusal_decision())
1037 .unwrap();
1038 let event2 =
1039 DeferredAuditEvent::from_refusal("agent:b", &refusal_action(), &refusal_decision())
1040 .unwrap();
1041 queue.submit(event1);
1042 clone.submit(event2);
1043 let r1 = rx.recv().await.unwrap();
1044 let r2 = rx.recv().await.unwrap();
1045 let agents: Vec<_> = vec![r1.agent_id, r2.agent_id];
1046 assert!(agents.contains(&"agent:a".to_string()));
1047 assert!(agents.contains(&"agent:b".to_string()));
1048 // Both sides share the same metrics handle
1049 assert_eq!(queue.metrics().submitted_count(), 2);
1050 assert_eq!(clone.metrics().submitted_count(), 2);
1051 }
1052
1053 // -----------------------------------------------------------------
1054 // Drainer task behavior with mock sink
1055 // -----------------------------------------------------------------
1056
1057 /// Mock sink: stores every received event in an owned Vec for
1058 /// post-condition assertions; optionally panics on the Nth
1059 /// event to drive supervisor-recovery tests.
1060 #[derive(Clone, Default)]
1061 struct MockSink {
1062 // Recorded events, behind a mutex so the test can read while
1063 // the drainer writes.
1064 recorded: Arc<Mutex<Vec<DeferredAuditEvent>>>,
1065 // Optional: panic on the Nth append (zero-indexed).
1066 panic_on: Option<usize>,
1067 // Optional: error on the Nth append (zero-indexed).
1068 error_on: Option<usize>,
1069 // Counter (shared across clones for supervisor-restart
1070 // counting).
1071 call_count: Arc<AtomicU64>,
1072 }
1073
1074 impl DeferredAuditSink for MockSink {
1075 fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
1076 let prior = self.call_count.fetch_add(1, Ordering::SeqCst) as usize;
1077 if Some(prior) == self.panic_on {
1078 panic!("mock sink: configured panic at call {prior}");
1079 }
1080 if Some(prior) == self.error_on {
1081 return Err(anyhow::anyhow!(
1082 "mock sink: configured error at call {prior}"
1083 ));
1084 }
1085 self.recorded.lock().unwrap().push(event.clone());
1086 Ok(AppendOutcome::Appended)
1087 }
1088 }
1089
1090 #[tokio::test]
1091 async fn drainer_appends_every_submitted_event() {
1092 let (queue, rx) = DeferredAuditQueue::new();
1093 let metrics = queue.metrics();
1094 let sink = MockSink::default();
1095 let recorded = sink.recorded.clone();
1096 let handle = spawn_drainer_task(rx, sink, metrics.clone());
1097
1098 for i in 0..5 {
1099 let mut event = DeferredAuditEvent::from_refusal(
1100 &format!("agent:{i}"),
1101 &refusal_action(),
1102 &refusal_decision(),
1103 )
1104 .unwrap();
1105 event.timestamp = chrono::Utc::now();
1106 queue.submit(event);
1107 }
1108
1109 // Drop the queue (sender) to terminate the drainer.
1110 drop(queue);
1111 let _returned_rx = handle.await.unwrap();
1112
1113 let recorded = recorded.lock().unwrap();
1114 assert_eq!(recorded.len(), 5);
1115 for (i, ev) in recorded.iter().enumerate() {
1116 assert_eq!(ev.agent_id, format!("agent:{i}"));
1117 }
1118 assert_eq!(metrics.appended_count(), 5);
1119 }
1120
1121 #[tokio::test]
1122 async fn drainer_continues_after_sink_error() {
1123 // Sink errors on the second call; the drainer should
1124 // record the error in metrics and proceed to handle
1125 // subsequent events.
1126 let (queue, rx) = DeferredAuditQueue::new();
1127 let metrics = queue.metrics();
1128 let mut sink = MockSink::default();
1129 sink.error_on = Some(1);
1130 let recorded = sink.recorded.clone();
1131 let handle = spawn_drainer_task(rx, sink, metrics.clone());
1132
1133 for i in 0..3 {
1134 let event = DeferredAuditEvent::from_refusal(
1135 &format!("agent:{i}"),
1136 &refusal_action(),
1137 &refusal_decision(),
1138 )
1139 .unwrap();
1140 queue.submit(event);
1141 }
1142 drop(queue);
1143 let _ = handle.await.unwrap();
1144 // Event 0 and 2 landed; event 1 hit the error.
1145 let recorded = recorded.lock().unwrap();
1146 assert_eq!(recorded.len(), 2);
1147 assert_eq!(metrics.appended_count(), 2);
1148 assert_eq!(metrics.append_failure_count(), 1);
1149 }
1150
1151 // -----------------------------------------------------------------
1152 // Supervisor: panic recovery + graceful shutdown
1153 // -----------------------------------------------------------------
1154
1155 #[tokio::test]
1156 async fn supervisor_records_panic_metric_on_drainer_panic() {
1157 // Sink panics on first call. The supervisor catches the
1158 // panic, bumps drainer_panics, and (per current
1159 // implementation) terminates after the panic — the receiver
1160 // moved into the panicked task and cannot be recovered.
1161 // We verify the panic-counter side-effect.
1162 let (queue, rx) = DeferredAuditQueue::new();
1163 let metrics = queue.metrics();
1164 let panic_on = Some(0_usize);
1165 let supervisor = spawn_supervised_drainer(
1166 rx,
1167 move || MockSink {
1168 recorded: Arc::new(Mutex::new(Vec::new())),
1169 panic_on,
1170 error_on: None,
1171 call_count: Arc::new(AtomicU64::new(0)),
1172 },
1173 metrics.clone(),
1174 1, // max 1 restart (= no respawn beyond the initial spawn)
1175 );
1176
1177 let event =
1178 DeferredAuditEvent::from_refusal("agent:panic", &refusal_action(), &refusal_decision())
1179 .unwrap();
1180 queue.submit(event);
1181 // Wait for the supervisor to observe the panic and exit.
1182 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), supervisor)
1183 .await
1184 .expect("supervisor must exit after observing panic");
1185 assert_eq!(
1186 metrics.panic_count(),
1187 1,
1188 "supervisor must record exactly one drainer panic"
1189 );
1190 }
1191
1192 #[tokio::test]
1193 async fn supervisor_graceful_shutdown_drains_buffered_events() {
1194 let (queue, rx) = DeferredAuditQueue::new();
1195 let metrics = queue.metrics();
1196 let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
1197 let recorded_for_factory = recorded.clone();
1198 let supervisor = spawn_supervised_drainer(
1199 rx,
1200 move || MockSink {
1201 recorded: recorded_for_factory.clone(),
1202 panic_on: None,
1203 error_on: None,
1204 call_count: Arc::new(AtomicU64::new(0)),
1205 },
1206 metrics.clone(),
1207 u32::MAX,
1208 );
1209
1210 // Submit 50 events.
1211 for i in 0..50 {
1212 let event = DeferredAuditEvent::from_refusal(
1213 &format!("agent:{i}"),
1214 &refusal_action(),
1215 &refusal_decision(),
1216 )
1217 .unwrap();
1218 queue.submit(event);
1219 }
1220
1221 // Initiate shutdown — close_and_flush drops the queue and
1222 // awaits the supervisor.
1223 close_and_flush(queue, supervisor)
1224 .await
1225 .expect("supervisor must terminate cleanly");
1226
1227 let recorded = recorded.lock().unwrap();
1228 assert_eq!(
1229 recorded.len(),
1230 50,
1231 "shutdown must drain every buffered event"
1232 );
1233 assert_eq!(metrics.appended_count(), 50);
1234 }
1235
1236 // -----------------------------------------------------------------
1237 // drain_pending — shutdown drain that does NOT require the senders to
1238 // be dropped (the daemon's governance hooks hold OnceLock-resident
1239 // sender clones that live for the whole process, so the channel never
1240 // closes — `close_and_flush` would block forever; `drain_pending`
1241 // polls the metrics instead).
1242 // -----------------------------------------------------------------
1243
1244 #[tokio::test]
1245 async fn drain_pending_completes_without_closing_channel() {
1246 let (queue, rx) = DeferredAuditQueue::new();
1247 let metrics = queue.metrics();
1248 let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
1249 let recorded_for_factory = recorded.clone();
1250 let supervisor = spawn_supervised_drainer(
1251 rx,
1252 move || MockSink {
1253 recorded: recorded_for_factory.clone(),
1254 panic_on: None,
1255 error_on: None,
1256 call_count: Arc::new(AtomicU64::new(0)),
1257 },
1258 metrics.clone(),
1259 u32::MAX,
1260 );
1261
1262 for i in 0..50 {
1263 let event = DeferredAuditEvent::from_refusal(
1264 &format!("agent:{i}"),
1265 &refusal_action(),
1266 &refusal_decision(),
1267 )
1268 .unwrap();
1269 queue.submit(event);
1270 }
1271
1272 // Simulate the daemon's OnceLock-held sender by keeping a clone
1273 // alive across the drain — the channel MUST stay open the whole
1274 // time (this is exactly the scenario that wedges close_and_flush).
1275 let hook_held_sender = queue.clone();
1276
1277 let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
1278 assert!(
1279 drained,
1280 "drain_pending must complete while the channel is open"
1281 );
1282 assert!(
1283 hook_held_sender.is_open(),
1284 "channel must still be open — drain_pending must not depend on sender drop"
1285 );
1286 assert_eq!(metrics.appended_count(), 50);
1287 assert_eq!(recorded.lock().unwrap().len(), 50);
1288
1289 // Cleanup: drop both producer handles so the drainer can exit.
1290 drop(hook_held_sender);
1291 drop(queue);
1292 let _ = supervisor.await;
1293 }
1294
1295 #[tokio::test]
1296 async fn drain_pending_times_out_when_drainer_absent() {
1297 // No drainer spawned: submitted events sit in the channel buffer
1298 // forever, so the metrics never advance and drain_pending must
1299 // report a timeout (false) rather than hang.
1300 let (queue, _rx) = DeferredAuditQueue::new();
1301 let metrics = queue.metrics();
1302 for i in 0..3 {
1303 let event = DeferredAuditEvent::from_refusal(
1304 &format!("agent:{i}"),
1305 &refusal_action(),
1306 &refusal_decision(),
1307 )
1308 .unwrap();
1309 queue.submit(event);
1310 }
1311 let drained = drain_pending(&metrics, std::time::Duration::from_millis(100)).await;
1312 assert!(
1313 !drained,
1314 "drain_pending must return false when events never get accounted"
1315 );
1316 assert_eq!(metrics.submitted_count(), 3);
1317 assert_eq!(metrics.appended_count(), 0);
1318 }
1319
1320 #[tokio::test]
1321 async fn drain_pending_returns_immediately_when_already_drained() {
1322 let (queue, _rx) = DeferredAuditQueue::new();
1323 let metrics = queue.metrics();
1324 // Nothing submitted → already drained → returns true at once.
1325 let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
1326 assert!(drained);
1327 }
1328
1329 #[tokio::test]
1330 async fn drain_pending_counts_append_failures_as_accounted() {
1331 // A sink that always errors still "accounts" for the event via
1332 // append_failures, so drain_pending must terminate (the audit row
1333 // is lost/DLQ'd but the shutdown path must not hang on it).
1334 let (queue, rx) = DeferredAuditQueue::new();
1335 let metrics = queue.metrics();
1336 // Factory yields a sink that errors on its first (only) call, so
1337 // the event is "accounted" via append_failures rather than appended.
1338 let supervisor = spawn_supervised_drainer(
1339 rx,
1340 move || MockSink {
1341 recorded: Arc::new(Mutex::new(Vec::new())),
1342 panic_on: None,
1343 error_on: Some(0),
1344 call_count: Arc::new(AtomicU64::new(0)),
1345 },
1346 metrics.clone(),
1347 u32::MAX,
1348 );
1349 let event =
1350 DeferredAuditEvent::from_refusal("agent:err", &refusal_action(), &refusal_decision())
1351 .unwrap();
1352 queue.submit(event);
1353 let hook_held = queue.clone();
1354 let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
1355 assert!(
1356 drained,
1357 "append failures count as accounted — must not hang"
1358 );
1359 assert_eq!(metrics.append_failure_count(), 1);
1360 drop(hook_held);
1361 drop(queue);
1362 let _ = supervisor.await;
1363 }
1364
1365 #[tokio::test]
1366 async fn close_and_flush_works_with_zero_events() {
1367 let (queue, rx) = DeferredAuditQueue::new();
1368 let metrics = queue.metrics();
1369 let supervisor =
1370 spawn_supervised_drainer(rx, move || MockSink::default(), metrics.clone(), u32::MAX);
1371 close_and_flush(queue, supervisor).await.unwrap();
1372 assert_eq!(metrics.appended_count(), 0);
1373 assert_eq!(metrics.submitted_count(), 0);
1374 }
1375
1376 // -----------------------------------------------------------------
1377 // High-volume / backpressure-edge — drainer slow, many submits
1378 // queued, all drain eventually
1379 // -----------------------------------------------------------------
1380
1381 /// Slow sink: artificial 1 ms delay per append to simulate a
1382 /// busy fsync path.
1383 struct SlowSink {
1384 recorded: Arc<Mutex<Vec<DeferredAuditEvent>>>,
1385 }
1386
1387 impl DeferredAuditSink for SlowSink {
1388 fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
1389 std::thread::sleep(std::time::Duration::from_millis(1));
1390 self.recorded.lock().unwrap().push(event.clone());
1391 Ok(AppendOutcome::Appended)
1392 }
1393 }
1394
1395 #[tokio::test]
1396 async fn unbounded_queue_handles_burst_no_drops() {
1397 let (queue, rx) = DeferredAuditQueue::new();
1398 let metrics = queue.metrics();
1399 let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
1400 let recorded_for_factory = recorded.clone();
1401 let supervisor = spawn_supervised_drainer(
1402 rx,
1403 move || SlowSink {
1404 recorded: recorded_for_factory.clone(),
1405 },
1406 metrics.clone(),
1407 u32::MAX,
1408 );
1409
1410 // Burst 200 events (drainer is ~1 ms/event so this will
1411 // accumulate before draining).
1412 for i in 0..200 {
1413 let event = DeferredAuditEvent::from_refusal(
1414 &format!("agent:{i}"),
1415 &refusal_action(),
1416 &refusal_decision(),
1417 )
1418 .unwrap();
1419 assert!(
1420 queue.submit(event),
1421 "unbounded queue must never refuse a submit"
1422 );
1423 }
1424 assert_eq!(metrics.submitted_count(), 200);
1425 assert_eq!(metrics.send_failure_count(), 0);
1426
1427 close_and_flush(queue, supervisor).await.unwrap();
1428 let recorded = recorded.lock().unwrap();
1429 assert_eq!(recorded.len(), 200);
1430 assert_eq!(metrics.appended_count(), 200);
1431 }
1432
1433 // -----------------------------------------------------------------
1434 // Production-sink (SqliteSignedEventsSink) integration — opens a
1435 // real SQLite Connection against a temp file and asserts the row
1436 // lands.
1437 // -----------------------------------------------------------------
1438
1439 fn fresh_tempdir() -> tempfile::TempDir {
1440 // Honor project hard rule: no /tmp writes by name. The
1441 // tempfile crate honors TMPDIR (exported at session
1442 // bootstrap to .local-runs/tmp), so this resolves under the
1443 // project-local scratch tree.
1444 tempfile::tempdir().expect("tempdir")
1445 }
1446
1447 #[tokio::test]
1448 async fn sqlite_sink_appends_governance_refusal_row() {
1449 let dir = fresh_tempdir();
1450 let db_path = dir.path().join("def-audit-test.db");
1451 // Pre-create the schema via crate::db::open (applies
1452 // migrations including signed_events).
1453 let _ = crate::db::open(&db_path).expect("init db");
1454
1455 let (queue, rx) = DeferredAuditQueue::new();
1456 let metrics = queue.metrics();
1457 let db_path_buf = db_path.clone();
1458 let supervisor = spawn_supervised_drainer(
1459 rx,
1460 move || SqliteSignedEventsSink::new(db_path_buf.clone()),
1461 metrics.clone(),
1462 u32::MAX,
1463 );
1464
1465 let event =
1466 DeferredAuditEvent::from_refusal("agent:int", &refusal_action(), &refusal_decision())
1467 .unwrap();
1468 queue.submit(event);
1469
1470 close_and_flush(queue, supervisor).await.unwrap();
1471 assert_eq!(metrics.appended_count(), 1);
1472
1473 // Verify the row landed with event_type=governance.refusal.
1474 let conn = crate::db::open(&db_path).expect("reopen db");
1475 let count: i64 = conn
1476 .query_row(
1477 "SELECT COUNT(*) FROM signed_events WHERE event_type = ?1 AND agent_id = ?2",
1478 rusqlite::params![GOVERNANCE_REFUSAL_EVENT_TYPE, "agent:int"],
1479 |r| r.get(0),
1480 )
1481 .unwrap();
1482 assert_eq!(count, 1, "drainer must have written the row");
1483 }
1484
1485 #[tokio::test]
1486 async fn sqlite_sink_lazy_open_only_on_first_append() {
1487 // Construct a sink against a path that doesn't exist yet; if
1488 // we never call append, ensure_conn must never run.
1489 let nonexistent = std::path::PathBuf::from("/this/path/does/not/exist/db.sqlite");
1490 let sink = SqliteSignedEventsSink::new(nonexistent);
1491 // Just verifying construction doesn't open the DB — no
1492 // assertion needed; if `new` opened eagerly, this would
1493 // already have errored.
1494 drop(sink);
1495 }
1496
1497 #[tokio::test]
1498 async fn sqlite_sink_append_fails_on_bad_path_metrics_increments() {
1499 let (queue, rx) = DeferredAuditQueue::new();
1500 let metrics = queue.metrics();
1501 // Build a sink pointing at a path that can't be opened (a
1502 // directory that doesn't exist + a non-creatable subdir
1503 // would do it; under macOS/Linux we use a path under /sys
1504 // which is read-only).
1505 let bad_path =
1506 std::path::PathBuf::from("/nonexistent-readonly-dir-for-deferred-audit-test/db.sqlite");
1507 let supervisor = spawn_supervised_drainer(
1508 rx,
1509 move || SqliteSignedEventsSink::new(bad_path.clone()),
1510 metrics.clone(),
1511 u32::MAX,
1512 );
1513 let event =
1514 DeferredAuditEvent::from_refusal("agent:bad", &refusal_action(), &refusal_decision())
1515 .unwrap();
1516 queue.submit(event);
1517 // Allow the drainer to attempt the append.
1518 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1519 close_and_flush(queue, supervisor).await.unwrap();
1520 assert!(
1521 metrics.append_failure_count() >= 1,
1522 "append failure on bad path must be recorded; got {}",
1523 metrics.append_failure_count()
1524 );
1525 assert_eq!(metrics.appended_count(), 0);
1526 }
1527
1528 // -----------------------------------------------------------------
1529 // install_deferred_audit_drainer end-to-end (the daemon-facing
1530 // installer)
1531 // -----------------------------------------------------------------
1532
1533 #[tokio::test]
1534 async fn installer_returns_open_queue_and_running_supervisor() {
1535 let dir = fresh_tempdir();
1536 let db_path = dir.path().join("installer-test.db");
1537 let _ = crate::db::open(&db_path).expect("init db");
1538
1539 let (queue, supervisor) = install_deferred_audit_drainer(&db_path);
1540 assert!(queue.is_open());
1541
1542 let event = DeferredAuditEvent::from_refusal(
1543 "agent:installer",
1544 &refusal_action(),
1545 &refusal_decision(),
1546 )
1547 .unwrap();
1548 queue.submit(event);
1549
1550 close_and_flush(queue, supervisor).await.unwrap();
1551
1552 let conn = crate::db::open(&db_path).expect("reopen db");
1553 let count: i64 = conn
1554 .query_row(
1555 "SELECT COUNT(*) FROM signed_events WHERE event_type = ?1",
1556 rusqlite::params![GOVERNANCE_REFUSAL_EVENT_TYPE],
1557 |r| r.get(0),
1558 )
1559 .unwrap();
1560 assert_eq!(count, 1);
1561 }
1562
1563 // -----------------------------------------------------------------
1564 // Cluster-C SEC-3 (issue #767) — DLQ + race-retry coverage
1565 // -----------------------------------------------------------------
1566
1567 /// When the sink hits a non-race rusqlite error on the
1568 /// `signed_events` INSERT, the event MUST land in
1569 /// `signed_events_dlq` (NOT silently dropped) and the
1570 /// `dlq_landed` counter MUST bump.
1571 ///
1572 /// Failure injection: pre-fill `signed_events` with a row whose
1573 /// `id` collides with the next UUIDv4 the sink will mint. We can't
1574 /// predict UUIDs, so we instead break the table at the schema
1575 /// level: drop the `event_type` column. The next `append_signed_event`
1576 /// fails on the INSERT (column not found) — an unrecoverable
1577 /// non-race error that triggers DLQ land.
1578 #[tokio::test]
1579 async fn sqlite_sink_lands_event_in_dlq_on_unrecoverable_error() {
1580 let dir = fresh_tempdir();
1581 let db_path = dir.path().join("dlq-test.db");
1582 let _ = crate::db::open(&db_path).expect("init db");
1583
1584 // Inject the fault: drop the NOT-NULL `event_type` column from
1585 // signed_events. Subsequent inserts fail with a constraint
1586 // error (NULL into NOT NULL slot) — an unrecoverable non-race
1587 // error. We do this by rebuilding the table without the
1588 // column; the DLQ table stays intact.
1589 {
1590 let conn = crate::db::open(&db_path).expect("open for fault inject");
1591 conn.execute_batch(
1592 "DROP TABLE signed_events; \
1593 CREATE TABLE signed_events ( \
1594 id TEXT PRIMARY KEY, \
1595 agent_id TEXT NOT NULL, \
1596 payload_hash BLOB NOT NULL, \
1597 signature BLOB, \
1598 attest_level TEXT NOT NULL DEFAULT 'unsigned', \
1599 timestamp TEXT NOT NULL, \
1600 prev_hash BLOB, \
1601 sequence INTEGER \
1602 ); \
1603 CREATE UNIQUE INDEX idx_signed_events_sequence \
1604 ON signed_events(sequence);",
1605 )
1606 .expect("fault-inject schema rewrite");
1607 }
1608
1609 // Spawn drainer + submit one event. The sink will fail the
1610 // append (column missing → schema mismatch is unrecoverable)
1611 // and land it in DLQ.
1612 let (queue, supervisor) = install_deferred_audit_drainer(&db_path);
1613 let metrics = queue.metrics();
1614 let event = DeferredAuditEvent::from_refusal(
1615 "agent:dlq-test",
1616 &refusal_action(),
1617 &refusal_decision(),
1618 )
1619 .unwrap();
1620 queue.submit(event);
1621 close_and_flush(queue, supervisor).await.unwrap();
1622
1623 // Assert: chain row NOT written, DLQ row present, counters
1624 // reflect the outcome.
1625 let conn = crate::db::open(&db_path).expect("reopen db");
1626 let chain_count: i64 = conn
1627 .query_row("SELECT COUNT(*) FROM signed_events", [], |r| r.get(0))
1628 .unwrap_or(0);
1629 assert_eq!(
1630 chain_count, 0,
1631 "signed_events chain MUST NOT advance when sink fails"
1632 );
1633 let dlq_count: i64 = conn
1634 .query_row("SELECT COUNT(*) FROM signed_events_dlq", [], |r| r.get(0))
1635 .unwrap();
1636 assert_eq!(dlq_count, 1, "exactly one DLQ row expected");
1637 let dlq_size_live = dlq_size(&conn).expect("dlq_size");
1638 assert_eq!(dlq_size_live, 1, "dlq_size helper must reflect live count");
1639 assert!(
1640 metrics.dlq_landed_count() >= 1,
1641 "dlq_landed metric must bump on DLQ landing"
1642 );
1643 }
1644
1645 /// `is_unique_constraint_race` MUST return true for a synthetic
1646 /// `SQLITE_CONSTRAINT_UNIQUE` error and false for other rusqlite
1647 /// errors. Pins the classification helper against rusqlite
1648 /// version drift.
1649 #[test]
1650 fn is_unique_constraint_race_classifies_correctly() {
1651 let unique_err = rusqlite::Error::SqliteFailure(
1652 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE),
1653 Some("UNIQUE constraint failed".to_string()),
1654 );
1655 let other_err = rusqlite::Error::SqliteFailure(
1656 rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
1657 Some("database is locked".to_string()),
1658 );
1659 let wrapped_unique: anyhow::Error =
1660 anyhow::Error::from(unique_err).context("append signed_event");
1661 let wrapped_other: anyhow::Error =
1662 anyhow::Error::from(other_err).context("append signed_event");
1663 assert!(is_unique_constraint_race(&wrapped_unique));
1664 assert!(!is_unique_constraint_race(&wrapped_other));
1665
1666 // Non-rusqlite errors are never UNIQUE races.
1667 let plain: anyhow::Error = anyhow::anyhow!("plain error");
1668 assert!(!is_unique_constraint_race(&plain));
1669 }
1670
1671 /// `dlq_size` MUST return 0 on a fresh DB (no DLQ rows).
1672 #[tokio::test]
1673 async fn dlq_size_returns_zero_on_fresh_db() {
1674 let dir = fresh_tempdir();
1675 let db_path = dir.path().join("dlq-empty.db");
1676 let conn = crate::db::open(&db_path).expect("init db");
1677 assert_eq!(dlq_size(&conn).expect("dlq_size on fresh"), 0);
1678 }
1679
1680 // -----------------------------------------------------------------
1681 // Metrics — getter coverage
1682 // -----------------------------------------------------------------
1683
1684 #[test]
1685 fn metrics_default_returns_zeroes() {
1686 let m = DeferredAuditMetrics::default();
1687 assert_eq!(m.submitted_count(), 0);
1688 assert_eq!(m.appended_count(), 0);
1689 assert_eq!(m.send_failure_count(), 0);
1690 assert_eq!(m.append_failure_count(), 0);
1691 assert_eq!(m.panic_count(), 0);
1692 assert_eq!(m.dlq_landed_count(), 0);
1693 assert_eq!(m.unique_race_retry_count(), 0);
1694 }
1695
1696 #[test]
1697 fn metrics_clone_shares_counters() {
1698 let m1 = DeferredAuditMetrics::default();
1699 let m2 = m1.clone();
1700 m1.submitted.fetch_add(7, Ordering::Relaxed);
1701 // m2 sees the same counter — Arc<Atomic> semantics.
1702 assert_eq!(m2.submitted_count(), 7);
1703 }
1704
1705 // -----------------------------------------------------------------
1706 // GOVERNANCE_REFUSAL_EVENT_TYPE is a stable wire string
1707 // -----------------------------------------------------------------
1708
1709 #[test]
1710 fn governance_refusal_event_type_is_stable() {
1711 assert_eq!(GOVERNANCE_REFUSAL_EVENT_TYPE, "governance.refusal");
1712 }
1713}