use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::{Context, Result};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;
use crate::governance::agent_action::{AgentAction, Decision};
use crate::signed_events::{SignedEvent, append_signed_event, payload_hash};
const APPEND_UNIQUE_RACE_MAX_RETRIES: usize = 5;
pub const GOVERNANCE_REFUSAL_EVENT_TYPE: &str = "governance.refusal";
#[derive(Debug, Clone)]
pub struct DeferredAuditEvent {
pub agent_id: String,
pub action: AgentAction,
pub decision: Decision,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
impl DeferredAuditEvent {
#[must_use]
pub fn from_refusal(agent_id: &str, action: &AgentAction, decision: &Decision) -> Option<Self> {
if !decision.is_refusal() {
return None;
}
Some(Self {
agent_id: agent_id.to_string(),
action: action.clone(),
decision: decision.clone(),
timestamp: chrono::Utc::now(),
})
}
#[must_use]
pub fn rule_id(&self) -> Option<&str> {
match &self.decision {
Decision::Refuse { rule_id, .. } => Some(rule_id.as_str()),
_ => None,
}
}
#[must_use]
pub fn reason(&self) -> Option<&str> {
match &self.decision {
Decision::Refuse { reason, .. } => Some(reason.as_str()),
_ => None,
}
}
pub fn canonical_bytes(&self) -> Result<Vec<u8>> {
let canonical = serde_json::json!({
"action": self.action,
"decision": self.decision,
"agent_id": self.agent_id,
"timestamp": self.timestamp.to_rfc3339(),
});
serde_json::to_vec(&canonical).context("DeferredAuditEvent::canonical_bytes")
}
}
#[derive(Debug, Clone, Default)]
pub struct DeferredAuditMetrics {
pub submitted: Arc<AtomicU64>,
pub appended: Arc<AtomicU64>,
pub send_failures: Arc<AtomicU64>,
pub append_failures: Arc<AtomicU64>,
pub drainer_panics: Arc<AtomicU64>,
pub dlq_landed: Arc<AtomicU64>,
pub unique_race_retries: Arc<AtomicU64>,
}
impl DeferredAuditMetrics {
#[must_use]
pub fn submitted_count(&self) -> u64 {
self.submitted.load(Ordering::Relaxed)
}
#[must_use]
pub fn appended_count(&self) -> u64 {
self.appended.load(Ordering::Relaxed)
}
#[must_use]
pub fn send_failure_count(&self) -> u64 {
self.send_failures.load(Ordering::Relaxed)
}
#[must_use]
pub fn append_failure_count(&self) -> u64 {
self.append_failures.load(Ordering::Relaxed)
}
#[must_use]
pub fn panic_count(&self) -> u64 {
self.drainer_panics.load(Ordering::Relaxed)
}
#[must_use]
pub fn dlq_landed_count(&self) -> u64 {
self.dlq_landed.load(Ordering::Relaxed)
}
#[must_use]
pub fn unique_race_retry_count(&self) -> u64 {
self.unique_race_retries.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct DeferredAuditQueue {
sender: UnboundedSender<DeferredAuditEvent>,
metrics: DeferredAuditMetrics,
}
impl DeferredAuditQueue {
#[must_use]
pub fn new() -> (Self, UnboundedReceiver<DeferredAuditEvent>) {
let (sender, receiver) = mpsc::unbounded_channel();
let queue = Self {
sender,
metrics: DeferredAuditMetrics::default(),
};
(queue, receiver)
}
pub fn submit(&self, event: DeferredAuditEvent) -> bool {
self.metrics.submitted.fetch_add(1, Ordering::Relaxed);
match self.sender.send(event) {
Ok(()) => true,
Err(_) => {
self.metrics.send_failures.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
"deferred_audit_queue: submit failed (drainer receiver closed); \
audit chain row LOST for this refusal"
);
false
}
}
}
pub fn submit_refusal(
&self,
agent_id: &str,
action: &AgentAction,
decision: &Decision,
) -> bool {
let Some(event) = DeferredAuditEvent::from_refusal(agent_id, action, decision) else {
return false;
};
self.submit(event)
}
#[must_use]
pub fn metrics(&self) -> DeferredAuditMetrics {
self.metrics.clone()
}
#[must_use]
pub fn is_open(&self) -> bool {
!self.sender.is_closed()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AppendOutcome {
Appended,
DlqLanded,
}
pub trait DeferredAuditSink: Send + 'static {
fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome>;
}
pub struct SqliteSignedEventsSink {
db_path: PathBuf,
conn: Option<rusqlite::Connection>,
metrics: Option<DeferredAuditMetrics>,
}
impl SqliteSignedEventsSink {
#[must_use]
pub fn new(db_path: impl Into<PathBuf>) -> Self {
Self {
db_path: db_path.into(),
conn: None,
metrics: None,
}
}
#[must_use]
pub fn with_metrics(db_path: impl Into<PathBuf>, metrics: DeferredAuditMetrics) -> Self {
Self {
db_path: db_path.into(),
conn: None,
metrics: Some(metrics),
}
}
fn ensure_conn(&mut self) -> Result<&rusqlite::Connection> {
if self.conn.is_none() {
let conn = crate::db::open(&self.db_path).with_context(|| {
format!(
"SqliteSignedEventsSink: open {} for deferred-audit drainer",
self.db_path.display()
)
})?;
self.conn = Some(conn);
}
Ok(self.conn.as_ref().expect("conn populated above"))
}
fn bump_dlq(&self) {
if let Some(m) = &self.metrics {
m.dlq_landed.fetch_add(1, Ordering::Relaxed);
}
}
fn bump_race_retry(&self) {
if let Some(m) = &self.metrics {
m.unique_race_retries.fetch_add(1, Ordering::Relaxed);
}
}
}
impl DeferredAuditSink for SqliteSignedEventsSink {
fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
let bytes = event
.canonical_bytes()
.context("SqliteSignedEventsSink: canonical_bytes")?;
let hash = payload_hash(&bytes);
let (signature, attest_level) =
match crate::governance::audit::try_sign_audit_payload(&hash) {
Some((sig, level)) => (Some(sig), level.to_string()),
None => (
None,
crate::models::AttestLevel::Unsigned.as_str().to_string(),
),
};
let signed = SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id: event.agent_id.clone(),
event_type: GOVERNANCE_REFUSAL_EVENT_TYPE.to_string(),
payload_hash: hash,
signature,
attest_level,
timestamp: event.timestamp.to_rfc3339(),
..SignedEvent::default()
};
let conn_path = self.db_path.clone();
let mut last_err: Option<anyhow::Error> = None;
for attempt in 0..=APPEND_UNIQUE_RACE_MAX_RETRIES {
let conn = self.ensure_conn()?;
match append_signed_event(conn, &signed) {
Ok(()) => return Ok(AppendOutcome::Appended),
Err(e) => {
if is_unique_constraint_race(&e) && attempt < APPEND_UNIQUE_RACE_MAX_RETRIES {
self.bump_race_retry();
tracing::warn!(
attempt = attempt + 1,
db = %conn_path.display(),
"deferred_audit sink: SQLITE_CONSTRAINT_UNIQUE on signed_events.sequence — \
chain-head race; retrying (budget {APPEND_UNIQUE_RACE_MAX_RETRIES})"
);
last_err = Some(e);
continue;
}
last_err = Some(e);
break;
}
}
}
let err = last_err.unwrap_or_else(|| anyhow::anyhow!("unknown drainer sink error"));
let failure_reason = format!("{err:#}");
let conn = self.ensure_conn()?;
conn.execute(
"INSERT INTO signed_events_dlq \
(id, agent_id, event_type, payload_hash, signature, attest_level, \
timestamp, failure_reason, failed_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
rusqlite::params![
signed.id,
signed.agent_id,
signed.event_type,
signed.payload_hash,
signed.signature,
signed.attest_level,
signed.timestamp,
failure_reason,
chrono::Utc::now().to_rfc3339(),
],
)
.context("SqliteSignedEventsSink: DLQ insert")?;
tracing::error!(
failure_reason = %failure_reason,
agent_id = %signed.agent_id,
event_id = %signed.id,
"deferred_audit sink: append exhausted retries or hit non-race error — \
event landed in signed_events_dlq (chain row NOT advanced; replay needed)"
);
self.bump_dlq();
Ok(AppendOutcome::DlqLanded)
}
}
fn is_unique_constraint_race(err: &anyhow::Error) -> bool {
for cause in err.chain() {
if let Some(rusqlite::Error::SqliteFailure(code, _)) =
cause.downcast_ref::<rusqlite::Error>()
{
if code.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE {
return true;
}
}
}
false
}
pub fn dlq_size(conn: &rusqlite::Connection) -> Result<u64> {
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM signed_events_dlq", [], |r| r.get(0))
.context("dlq_size: SELECT COUNT")?;
Ok(u64::try_from(count).unwrap_or(0))
}
#[must_use]
pub fn spawn_drainer_task<S: DeferredAuditSink + 'static>(
mut receiver: UnboundedReceiver<DeferredAuditEvent>,
mut sink: S,
metrics: DeferredAuditMetrics,
) -> JoinHandle<UnboundedReceiver<DeferredAuditEvent>> {
tokio::spawn(async move {
while let Some(event) = receiver.recv().await {
match sink.append(&event) {
Ok(AppendOutcome::Appended) => {
metrics.appended.fetch_add(1, Ordering::Relaxed);
}
Ok(AppendOutcome::DlqLanded) => {
metrics.append_failures.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
"deferred_audit drainer: event landed in DLQ \
(audit chain row NOT advanced; operator replay needed)"
);
}
Err(e) => {
metrics.append_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(
"deferred_audit drainer: sink.append failed (no DLQ landing either): {:#}",
e
);
}
}
}
receiver
})
}
#[must_use]
pub fn spawn_supervised_drainer<F, S>(
receiver: UnboundedReceiver<DeferredAuditEvent>,
make_sink: F,
metrics: DeferredAuditMetrics,
max_restarts: u32,
) -> JoinHandle<()>
where
F: Fn() -> S + Send + 'static,
S: DeferredAuditSink + 'static,
{
tokio::spawn(async move {
let sink = make_sink();
let handle = spawn_drainer_task(receiver, sink, metrics.clone());
match handle.await {
Ok(returned_receiver) => {
drop(returned_receiver);
}
Err(join_err) if join_err.is_panic() => {
metrics.drainer_panics.fetch_add(1, Ordering::Relaxed);
tracing::error!(
"deferred_audit supervisor: drainer task panicked ({join_err}); \
max_restarts={max_restarts} — receiver moved into the panicked \
task and cannot be recovered; future refusals submitted to the \
existing queue will fail to land. Operator action required: \
rebuild the daemon's deferred-audit queue (or restart the daemon) \
to restore the audit-chain property."
);
let _ = max_restarts;
}
Err(join_err) => {
tracing::warn!(
"deferred_audit supervisor: drainer aborted ({join_err}); \
pending events may be lost"
);
}
}
})
}
pub async fn close_and_flush(
queue: DeferredAuditQueue,
supervisor: JoinHandle<()>,
) -> std::result::Result<(), tokio::task::JoinError> {
drop(queue);
supervisor.await
}
pub const DEFAULT_SHUTDOWN_DRAIN_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(SHUTDOWN_DRAIN_TIMEOUT_SECS);
const SHUTDOWN_DRAIN_TIMEOUT_SECS: u64 = 5;
const DRAIN_POLL_INTERVAL: std::time::Duration =
std::time::Duration::from_millis(DRAIN_POLL_INTERVAL_MILLIS);
const DRAIN_POLL_INTERVAL_MILLIS: u64 = 10;
pub async fn drain_pending(metrics: &DeferredAuditMetrics, timeout: std::time::Duration) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if drain_accounted(metrics) >= metrics.submitted_count() {
return true;
}
if tokio::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(DRAIN_POLL_INTERVAL).await;
}
}
#[must_use]
fn drain_accounted(metrics: &DeferredAuditMetrics) -> u64 {
metrics
.appended_count()
.saturating_add(metrics.append_failure_count())
.saturating_add(metrics.send_failure_count())
}
#[must_use]
pub fn install_deferred_audit_drainer(db_path: &Path) -> (DeferredAuditQueue, JoinHandle<()>) {
let (queue, receiver) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let db_path_buf = db_path.to_path_buf();
let metrics_for_factory = metrics.clone();
let supervisor = spawn_supervised_drainer(
receiver,
move || {
SqliteSignedEventsSink::with_metrics(db_path_buf.clone(), metrics_for_factory.clone())
},
metrics,
u32::MAX,
);
(queue, supervisor)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
fn refusal_action() -> AgentAction {
AgentAction::Custom {
custom_kind: "memory_write".to_string(),
payload: serde_json::json!({"namespace": "secrets/api"}),
}
}
fn refusal_decision() -> Decision {
Decision::Refuse {
rule_id: "R001".to_string(),
reason: "no writes to secrets/*".to_string(),
}
}
#[test]
fn from_refusal_returns_some_for_refuse() {
let event =
DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &refusal_decision())
.expect("must be Some for Refuse verdict");
assert_eq!(event.agent_id, "agent:alice");
assert_eq!(event.rule_id(), Some("R001"));
assert_eq!(event.reason(), Some("no writes to secrets/*"));
}
#[test]
fn from_refusal_returns_none_for_allow() {
let event =
DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &Decision::Allow);
assert!(event.is_none(), "Allow verdict must not enqueue an event");
}
#[test]
fn from_refusal_returns_none_for_warn() {
let warn = Decision::Warn {
rule_id: "W001".to_string(),
reason: "warning only".to_string(),
};
let event = DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &warn);
assert!(event.is_none(), "Warn verdict must not enqueue a refusal");
}
#[test]
fn canonical_bytes_includes_rule_and_action() {
let event =
DeferredAuditEvent::from_refusal("agent:alice", &refusal_action(), &refusal_decision())
.unwrap();
let bytes = event.canonical_bytes().unwrap();
let s = std::str::from_utf8(&bytes).unwrap();
assert!(s.contains("R001"), "canonical payload must include rule_id");
assert!(
s.contains("memory_write"),
"canonical payload must include action kind"
);
assert!(
s.contains("agent:alice"),
"canonical payload must include agent id"
);
}
#[test]
fn rule_id_returns_none_for_non_refusal() {
let event = DeferredAuditEvent {
agent_id: "x".into(),
action: refusal_action(),
decision: Decision::Allow,
timestamp: chrono::Utc::now(),
};
assert!(event.rule_id().is_none());
assert!(event.reason().is_none());
}
#[tokio::test]
async fn queue_new_returns_open_handle() {
let (queue, _rx) = DeferredAuditQueue::new();
assert!(queue.is_open());
assert_eq!(queue.metrics().submitted_count(), 0);
}
#[tokio::test]
async fn submit_with_receiver_attached_succeeds() {
let (queue, mut rx) = DeferredAuditQueue::new();
let event =
DeferredAuditEvent::from_refusal("agent:t", &refusal_action(), &refusal_decision())
.unwrap();
assert!(queue.submit(event.clone()));
assert_eq!(queue.metrics().submitted_count(), 1);
let received = rx.recv().await.unwrap();
assert_eq!(received.agent_id, event.agent_id);
assert_eq!(received.rule_id(), Some("R001"));
}
#[tokio::test]
async fn submit_after_receiver_dropped_records_send_failure() {
let (queue, rx) = DeferredAuditQueue::new();
drop(rx);
assert!(!queue.is_open());
let event =
DeferredAuditEvent::from_refusal("agent:t", &refusal_action(), &refusal_decision())
.unwrap();
let ok = queue.submit(event);
assert!(!ok, "submit must return false when receiver is closed");
assert_eq!(queue.metrics().submitted_count(), 1);
assert_eq!(queue.metrics().send_failure_count(), 1);
}
#[tokio::test]
async fn submit_refusal_helper_skips_non_refusals() {
let (queue, mut rx) = DeferredAuditQueue::new();
let enq = queue.submit_refusal("agent:t", &refusal_action(), &Decision::Allow);
assert!(!enq);
let recv = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
assert!(recv.is_err(), "no event should have been enqueued");
let enq2 = queue.submit_refusal("agent:t", &refusal_action(), &refusal_decision());
assert!(enq2);
let event = rx.recv().await.unwrap();
assert_eq!(event.agent_id, "agent:t");
}
#[tokio::test]
async fn queue_clone_shares_underlying_channel() {
let (queue, mut rx) = DeferredAuditQueue::new();
let clone = queue.clone();
let event1 =
DeferredAuditEvent::from_refusal("agent:a", &refusal_action(), &refusal_decision())
.unwrap();
let event2 =
DeferredAuditEvent::from_refusal("agent:b", &refusal_action(), &refusal_decision())
.unwrap();
queue.submit(event1);
clone.submit(event2);
let r1 = rx.recv().await.unwrap();
let r2 = rx.recv().await.unwrap();
let agents: Vec<_> = vec![r1.agent_id, r2.agent_id];
assert!(agents.contains(&"agent:a".to_string()));
assert!(agents.contains(&"agent:b".to_string()));
assert_eq!(queue.metrics().submitted_count(), 2);
assert_eq!(clone.metrics().submitted_count(), 2);
}
#[derive(Clone, Default)]
struct MockSink {
recorded: Arc<Mutex<Vec<DeferredAuditEvent>>>,
panic_on: Option<usize>,
error_on: Option<usize>,
call_count: Arc<AtomicU64>,
}
impl DeferredAuditSink for MockSink {
fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
let prior = self.call_count.fetch_add(1, Ordering::SeqCst) as usize;
if Some(prior) == self.panic_on {
panic!("mock sink: configured panic at call {prior}");
}
if Some(prior) == self.error_on {
return Err(anyhow::anyhow!(
"mock sink: configured error at call {prior}"
));
}
self.recorded.lock().unwrap().push(event.clone());
Ok(AppendOutcome::Appended)
}
}
#[tokio::test]
async fn drainer_appends_every_submitted_event() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let sink = MockSink::default();
let recorded = sink.recorded.clone();
let handle = spawn_drainer_task(rx, sink, metrics.clone());
for i in 0..5 {
let mut event = DeferredAuditEvent::from_refusal(
&format!("agent:{i}"),
&refusal_action(),
&refusal_decision(),
)
.unwrap();
event.timestamp = chrono::Utc::now();
queue.submit(event);
}
drop(queue);
let _returned_rx = handle.await.unwrap();
let recorded = recorded.lock().unwrap();
assert_eq!(recorded.len(), 5);
for (i, ev) in recorded.iter().enumerate() {
assert_eq!(ev.agent_id, format!("agent:{i}"));
}
assert_eq!(metrics.appended_count(), 5);
}
#[tokio::test]
async fn drainer_continues_after_sink_error() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let mut sink = MockSink::default();
sink.error_on = Some(1);
let recorded = sink.recorded.clone();
let handle = spawn_drainer_task(rx, sink, metrics.clone());
for i in 0..3 {
let event = DeferredAuditEvent::from_refusal(
&format!("agent:{i}"),
&refusal_action(),
&refusal_decision(),
)
.unwrap();
queue.submit(event);
}
drop(queue);
let _ = handle.await.unwrap();
let recorded = recorded.lock().unwrap();
assert_eq!(recorded.len(), 2);
assert_eq!(metrics.appended_count(), 2);
assert_eq!(metrics.append_failure_count(), 1);
}
#[tokio::test]
async fn supervisor_records_panic_metric_on_drainer_panic() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let panic_on = Some(0_usize);
let supervisor = spawn_supervised_drainer(
rx,
move || MockSink {
recorded: Arc::new(Mutex::new(Vec::new())),
panic_on,
error_on: None,
call_count: Arc::new(AtomicU64::new(0)),
},
metrics.clone(),
1, );
let event =
DeferredAuditEvent::from_refusal("agent:panic", &refusal_action(), &refusal_decision())
.unwrap();
queue.submit(event);
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), supervisor)
.await
.expect("supervisor must exit after observing panic");
assert_eq!(
metrics.panic_count(),
1,
"supervisor must record exactly one drainer panic"
);
}
#[tokio::test]
async fn supervisor_graceful_shutdown_drains_buffered_events() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
let recorded_for_factory = recorded.clone();
let supervisor = spawn_supervised_drainer(
rx,
move || MockSink {
recorded: recorded_for_factory.clone(),
panic_on: None,
error_on: None,
call_count: Arc::new(AtomicU64::new(0)),
},
metrics.clone(),
u32::MAX,
);
for i in 0..50 {
let event = DeferredAuditEvent::from_refusal(
&format!("agent:{i}"),
&refusal_action(),
&refusal_decision(),
)
.unwrap();
queue.submit(event);
}
close_and_flush(queue, supervisor)
.await
.expect("supervisor must terminate cleanly");
let recorded = recorded.lock().unwrap();
assert_eq!(
recorded.len(),
50,
"shutdown must drain every buffered event"
);
assert_eq!(metrics.appended_count(), 50);
}
#[tokio::test]
async fn drain_pending_completes_without_closing_channel() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
let recorded_for_factory = recorded.clone();
let supervisor = spawn_supervised_drainer(
rx,
move || MockSink {
recorded: recorded_for_factory.clone(),
panic_on: None,
error_on: None,
call_count: Arc::new(AtomicU64::new(0)),
},
metrics.clone(),
u32::MAX,
);
for i in 0..50 {
let event = DeferredAuditEvent::from_refusal(
&format!("agent:{i}"),
&refusal_action(),
&refusal_decision(),
)
.unwrap();
queue.submit(event);
}
let hook_held_sender = queue.clone();
let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
assert!(
drained,
"drain_pending must complete while the channel is open"
);
assert!(
hook_held_sender.is_open(),
"channel must still be open — drain_pending must not depend on sender drop"
);
assert_eq!(metrics.appended_count(), 50);
assert_eq!(recorded.lock().unwrap().len(), 50);
drop(hook_held_sender);
drop(queue);
let _ = supervisor.await;
}
#[tokio::test]
async fn drain_pending_times_out_when_drainer_absent() {
let (queue, _rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
for i in 0..3 {
let event = DeferredAuditEvent::from_refusal(
&format!("agent:{i}"),
&refusal_action(),
&refusal_decision(),
)
.unwrap();
queue.submit(event);
}
let drained = drain_pending(&metrics, std::time::Duration::from_millis(100)).await;
assert!(
!drained,
"drain_pending must return false when events never get accounted"
);
assert_eq!(metrics.submitted_count(), 3);
assert_eq!(metrics.appended_count(), 0);
}
#[tokio::test]
async fn drain_pending_returns_immediately_when_already_drained() {
let (queue, _rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
assert!(drained);
}
#[tokio::test]
async fn drain_pending_counts_append_failures_as_accounted() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let supervisor = spawn_supervised_drainer(
rx,
move || MockSink {
recorded: Arc::new(Mutex::new(Vec::new())),
panic_on: None,
error_on: Some(0),
call_count: Arc::new(AtomicU64::new(0)),
},
metrics.clone(),
u32::MAX,
);
let event =
DeferredAuditEvent::from_refusal("agent:err", &refusal_action(), &refusal_decision())
.unwrap();
queue.submit(event);
let hook_held = queue.clone();
let drained = drain_pending(&metrics, std::time::Duration::from_secs(5)).await;
assert!(
drained,
"append failures count as accounted — must not hang"
);
assert_eq!(metrics.append_failure_count(), 1);
drop(hook_held);
drop(queue);
let _ = supervisor.await;
}
#[tokio::test]
async fn close_and_flush_works_with_zero_events() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let supervisor =
spawn_supervised_drainer(rx, move || MockSink::default(), metrics.clone(), u32::MAX);
close_and_flush(queue, supervisor).await.unwrap();
assert_eq!(metrics.appended_count(), 0);
assert_eq!(metrics.submitted_count(), 0);
}
struct SlowSink {
recorded: Arc<Mutex<Vec<DeferredAuditEvent>>>,
}
impl DeferredAuditSink for SlowSink {
fn append(&mut self, event: &DeferredAuditEvent) -> Result<AppendOutcome> {
std::thread::sleep(std::time::Duration::from_millis(1));
self.recorded.lock().unwrap().push(event.clone());
Ok(AppendOutcome::Appended)
}
}
#[tokio::test]
async fn unbounded_queue_handles_burst_no_drops() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let recorded: Arc<Mutex<Vec<DeferredAuditEvent>>> = Arc::new(Mutex::new(Vec::new()));
let recorded_for_factory = recorded.clone();
let supervisor = spawn_supervised_drainer(
rx,
move || SlowSink {
recorded: recorded_for_factory.clone(),
},
metrics.clone(),
u32::MAX,
);
for i in 0..200 {
let event = DeferredAuditEvent::from_refusal(
&format!("agent:{i}"),
&refusal_action(),
&refusal_decision(),
)
.unwrap();
assert!(
queue.submit(event),
"unbounded queue must never refuse a submit"
);
}
assert_eq!(metrics.submitted_count(), 200);
assert_eq!(metrics.send_failure_count(), 0);
close_and_flush(queue, supervisor).await.unwrap();
let recorded = recorded.lock().unwrap();
assert_eq!(recorded.len(), 200);
assert_eq!(metrics.appended_count(), 200);
}
fn fresh_tempdir() -> tempfile::TempDir {
tempfile::tempdir().expect("tempdir")
}
#[tokio::test]
async fn sqlite_sink_appends_governance_refusal_row() {
let dir = fresh_tempdir();
let db_path = dir.path().join("def-audit-test.db");
let _ = crate::db::open(&db_path).expect("init db");
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let db_path_buf = db_path.clone();
let supervisor = spawn_supervised_drainer(
rx,
move || SqliteSignedEventsSink::new(db_path_buf.clone()),
metrics.clone(),
u32::MAX,
);
let event =
DeferredAuditEvent::from_refusal("agent:int", &refusal_action(), &refusal_decision())
.unwrap();
queue.submit(event);
close_and_flush(queue, supervisor).await.unwrap();
assert_eq!(metrics.appended_count(), 1);
let conn = crate::db::open(&db_path).expect("reopen db");
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM signed_events WHERE event_type = ?1 AND agent_id = ?2",
rusqlite::params![GOVERNANCE_REFUSAL_EVENT_TYPE, "agent:int"],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1, "drainer must have written the row");
}
#[tokio::test]
async fn sqlite_sink_lazy_open_only_on_first_append() {
let nonexistent = std::path::PathBuf::from("/this/path/does/not/exist/db.sqlite");
let sink = SqliteSignedEventsSink::new(nonexistent);
drop(sink);
}
#[tokio::test]
async fn sqlite_sink_append_fails_on_bad_path_metrics_increments() {
let (queue, rx) = DeferredAuditQueue::new();
let metrics = queue.metrics();
let bad_path =
std::path::PathBuf::from("/nonexistent-readonly-dir-for-deferred-audit-test/db.sqlite");
let supervisor = spawn_supervised_drainer(
rx,
move || SqliteSignedEventsSink::new(bad_path.clone()),
metrics.clone(),
u32::MAX,
);
let event =
DeferredAuditEvent::from_refusal("agent:bad", &refusal_action(), &refusal_decision())
.unwrap();
queue.submit(event);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
close_and_flush(queue, supervisor).await.unwrap();
assert!(
metrics.append_failure_count() >= 1,
"append failure on bad path must be recorded; got {}",
metrics.append_failure_count()
);
assert_eq!(metrics.appended_count(), 0);
}
#[tokio::test]
async fn installer_returns_open_queue_and_running_supervisor() {
let dir = fresh_tempdir();
let db_path = dir.path().join("installer-test.db");
let _ = crate::db::open(&db_path).expect("init db");
let (queue, supervisor) = install_deferred_audit_drainer(&db_path);
assert!(queue.is_open());
let event = DeferredAuditEvent::from_refusal(
"agent:installer",
&refusal_action(),
&refusal_decision(),
)
.unwrap();
queue.submit(event);
close_and_flush(queue, supervisor).await.unwrap();
let conn = crate::db::open(&db_path).expect("reopen db");
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM signed_events WHERE event_type = ?1",
rusqlite::params![GOVERNANCE_REFUSAL_EVENT_TYPE],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn sqlite_sink_lands_event_in_dlq_on_unrecoverable_error() {
let dir = fresh_tempdir();
let db_path = dir.path().join("dlq-test.db");
let _ = crate::db::open(&db_path).expect("init db");
{
let conn = crate::db::open(&db_path).expect("open for fault inject");
conn.execute_batch(
"DROP TABLE signed_events; \
CREATE TABLE signed_events ( \
id TEXT PRIMARY KEY, \
agent_id TEXT NOT NULL, \
payload_hash BLOB NOT NULL, \
signature BLOB, \
attest_level TEXT NOT NULL DEFAULT 'unsigned', \
timestamp TEXT NOT NULL, \
prev_hash BLOB, \
sequence INTEGER \
); \
CREATE UNIQUE INDEX idx_signed_events_sequence \
ON signed_events(sequence);",
)
.expect("fault-inject schema rewrite");
}
let (queue, supervisor) = install_deferred_audit_drainer(&db_path);
let metrics = queue.metrics();
let event = DeferredAuditEvent::from_refusal(
"agent:dlq-test",
&refusal_action(),
&refusal_decision(),
)
.unwrap();
queue.submit(event);
close_and_flush(queue, supervisor).await.unwrap();
let conn = crate::db::open(&db_path).expect("reopen db");
let chain_count: i64 = conn
.query_row("SELECT COUNT(*) FROM signed_events", [], |r| r.get(0))
.unwrap_or(0);
assert_eq!(
chain_count, 0,
"signed_events chain MUST NOT advance when sink fails"
);
let dlq_count: i64 = conn
.query_row("SELECT COUNT(*) FROM signed_events_dlq", [], |r| r.get(0))
.unwrap();
assert_eq!(dlq_count, 1, "exactly one DLQ row expected");
let dlq_size_live = dlq_size(&conn).expect("dlq_size");
assert_eq!(dlq_size_live, 1, "dlq_size helper must reflect live count");
assert!(
metrics.dlq_landed_count() >= 1,
"dlq_landed metric must bump on DLQ landing"
);
}
#[test]
fn is_unique_constraint_race_classifies_correctly() {
let unique_err = rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE),
Some("UNIQUE constraint failed".to_string()),
);
let other_err = rusqlite::Error::SqliteFailure(
rusqlite::ffi::Error::new(rusqlite::ffi::SQLITE_BUSY),
Some("database is locked".to_string()),
);
let wrapped_unique: anyhow::Error =
anyhow::Error::from(unique_err).context("append signed_event");
let wrapped_other: anyhow::Error =
anyhow::Error::from(other_err).context("append signed_event");
assert!(is_unique_constraint_race(&wrapped_unique));
assert!(!is_unique_constraint_race(&wrapped_other));
let plain: anyhow::Error = anyhow::anyhow!("plain error");
assert!(!is_unique_constraint_race(&plain));
}
#[tokio::test]
async fn dlq_size_returns_zero_on_fresh_db() {
let dir = fresh_tempdir();
let db_path = dir.path().join("dlq-empty.db");
let conn = crate::db::open(&db_path).expect("init db");
assert_eq!(dlq_size(&conn).expect("dlq_size on fresh"), 0);
}
#[test]
fn metrics_default_returns_zeroes() {
let m = DeferredAuditMetrics::default();
assert_eq!(m.submitted_count(), 0);
assert_eq!(m.appended_count(), 0);
assert_eq!(m.send_failure_count(), 0);
assert_eq!(m.append_failure_count(), 0);
assert_eq!(m.panic_count(), 0);
assert_eq!(m.dlq_landed_count(), 0);
assert_eq!(m.unique_race_retry_count(), 0);
}
#[test]
fn metrics_clone_shares_counters() {
let m1 = DeferredAuditMetrics::default();
let m2 = m1.clone();
m1.submitted.fetch_add(7, Ordering::Relaxed);
assert_eq!(m2.submitted_count(), 7);
}
#[test]
fn governance_refusal_event_type_is_stable() {
assert_eq!(GOVERNANCE_REFUSAL_EVENT_TYPE, "governance.refusal");
}
}