use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub const SCHEMA_VERSION: u32 = 3;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct CallEvent {
pub ts: String,
pub call_id: String,
pub tool_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub caller_id: Option<String>,
pub granted_capabilities: Vec<String>,
pub duration_ms: u64,
pub outcome: Outcome,
pub tier: String,
pub dry_run: bool,
pub schema_version: u32,
#[serde(default)]
pub secrets_resolved: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cursor_page: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub capability_provenance: Option<Vec<CapProvenance>>,
}
impl CallEvent {
pub fn new(
ts: impl Into<String>,
call_id: impl Into<String>,
tool_id: impl Into<String>,
duration_ms: u64,
outcome: Outcome,
tier: impl Into<String>,
) -> Self {
Self {
ts: ts.into(),
call_id: call_id.into(),
tool_id: tool_id.into(),
caller_id: None,
granted_capabilities: Vec::new(),
duration_ms,
outcome,
tier: tier.into(),
dry_run: false,
schema_version: SCHEMA_VERSION,
secrets_resolved: false,
cursor_page: None,
capability_provenance: None,
}
}
pub fn with_caller_id(mut self, caller_id: Option<String>) -> Self {
self.caller_id = caller_id;
self
}
pub fn with_granted_capabilities(mut self, caps: Vec<String>) -> Self {
self.granted_capabilities = caps;
self
}
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
pub fn with_secrets_resolved(mut self, resolved: bool) -> Self {
self.secrets_resolved = resolved;
self
}
pub fn with_cursor_page(mut self, page: Option<u32>) -> Self {
self.cursor_page = page;
self
}
pub fn with_capability_provenance(mut self, provenance: Option<Vec<CapProvenance>>) -> Self {
self.capability_provenance = provenance;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CapProvenance {
pub cap: String,
pub source: ProvSource,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ProvSource {
StringAllowList,
UcanChain { issuer_did: String, chain_depth: u8 },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Outcome {
Success,
ExecutionFailed { code: String, retryable: bool },
InvalidArgs { message: String },
CapabilityDenied { missing: Vec<String> },
RateLimited { retry_after_ms: Option<u64> },
ToolNotFound,
}
#[derive(Clone)]
pub enum BackpressureStrategy {
Drop,
Block,
FallbackSink(Arc<dyn AuditSink>),
}
impl std::fmt::Debug for BackpressureStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BackpressureStrategy::Drop => f.write_str("Drop"),
BackpressureStrategy::Block => f.write_str("Block"),
BackpressureStrategy::FallbackSink(_) => f.write_str("FallbackSink(..)"),
}
}
}
pub trait AuditSink: Send + Sync {
fn on_call(&self, event: &CallEvent);
fn drops(&self) -> u64 {
0
}
fn backpressure_strategy(&self) -> BackpressureStrategy {
BackpressureStrategy::Drop
}
}
pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
pub struct JsonLinesAuditSink {
tx: Option<std::sync::mpsc::SyncSender<CallEvent>>,
drain: Option<std::thread::JoinHandle<()>>,
drops: Arc<AtomicU64>,
strategy: BackpressureStrategy,
}
impl JsonLinesAuditSink {
pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
}
pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
Self::with_strategy(writer, capacity, BackpressureStrategy::Drop)
}
pub fn with_strategy(
writer: Box<dyn Write + Send + 'static>,
capacity: usize,
strategy: BackpressureStrategy,
) -> Self {
if matches!(strategy, BackpressureStrategy::Block) {
if let Ok(h) = tokio::runtime::Handle::try_current() {
if h.runtime_flavor() == tokio::runtime::RuntimeFlavor::CurrentThread {
eprintln!(
"atd: WARNING — JsonLinesAuditSink Block strategy on a \
current_thread runtime; a blocked worker can stall accept \
under audit backpressure. Prefer a multi-thread runtime."
);
}
}
}
let (tx, rx) = std::sync::mpsc::sync_channel::<CallEvent>(capacity);
let drops = Arc::new(AtomicU64::new(0));
let mut writer = writer;
let drain = std::thread::spawn(move || {
while let Ok(ev) = rx.recv() {
if let Ok(mut line) = serde_json::to_vec(&ev) {
line.push(b'\n');
let _ = writer.write_all(&line);
let _ = writer.flush();
}
}
let _ = writer.flush();
});
Self {
tx: Some(tx),
drain: Some(drain),
drops,
strategy,
}
}
pub fn stdout() -> Self {
Self::new(Box::new(std::io::stdout()))
}
pub fn stderr() -> Self {
Self::new(Box::new(std::io::stderr()))
}
pub fn file(path: &Path) -> std::io::Result<Self> {
let f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(Self::new(Box::new(f)))
}
pub fn drops(&self) -> u64 {
self.drops.load(Ordering::Relaxed)
}
}
impl AuditSink for JsonLinesAuditSink {
fn on_call(&self, event: &CallEvent) {
let Some(tx) = self.tx.as_ref() else {
self.drops.fetch_add(1, Ordering::Relaxed);
return;
};
match &self.strategy {
BackpressureStrategy::Drop => {
if tx.try_send(event.clone()).is_err() {
self.drops.fetch_add(1, Ordering::Relaxed);
}
}
BackpressureStrategy::Block => {
if tx.send(event.clone()).is_err() {
self.drops.fetch_add(1, Ordering::Relaxed);
}
}
BackpressureStrategy::FallbackSink(fb) => {
if tx.try_send(event.clone()).is_err() {
fb.on_call(event);
}
}
}
}
fn drops(&self) -> u64 {
self.drops.load(Ordering::Relaxed)
}
fn backpressure_strategy(&self) -> BackpressureStrategy {
self.strategy.clone()
}
}
impl Drop for JsonLinesAuditSink {
fn drop(&mut self) {
self.tx.take(); if let Some(h) = self.drain.take() {
let _ = h.join();
}
}
}
pub fn now_rfc3339() -> String {
Utc::now().to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
fn mk_event(outcome: Outcome) -> CallEvent {
CallEvent::new(
now_rfc3339(),
"01J000000000000000000000TEST",
"ref:echo.say",
17,
outcome,
"warm",
)
.with_caller_id(Some("test-client".into()))
.with_granted_capabilities(vec!["read".into(), "write".into()])
}
#[test]
fn callevent_builder_defaults_then_setters() {
let e = CallEvent::new(now_rfc3339(), "cid", "tool:x", 5, Outcome::Success, "warm");
assert_eq!(e.tool_id, "tool:x");
assert_eq!(e.duration_ms, 5);
assert_eq!(e.schema_version, SCHEMA_VERSION);
assert!(e.caller_id.is_none());
assert!(e.granted_capabilities.is_empty());
assert!(!e.dry_run);
assert!(!e.secrets_resolved);
assert!(e.cursor_page.is_none());
assert!(e.capability_provenance.is_none());
let e2 = e
.with_caller_id(Some("agent-A".into()))
.with_cursor_page(Some(2))
.with_secrets_resolved(true);
assert_eq!(e2.caller_id.as_deref(), Some("agent-A"));
assert_eq!(e2.cursor_page, Some(2));
assert!(e2.secrets_resolved);
}
#[test]
fn success_event_serializes() {
let e = mk_event(Outcome::Success);
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
assert_eq!(j["tool_id"], "ref:echo.say");
assert_eq!(j["outcome"]["kind"], "success");
assert_eq!(j["schema_version"], 3);
assert_eq!(j["dry_run"], false);
}
#[test]
fn capability_denied_outcome_tagged_correctly() {
let e = mk_event(Outcome::CapabilityDenied {
missing: vec!["conformance.denied".into()],
});
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
assert_eq!(j["outcome"]["kind"], "capability_denied");
assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
}
#[test]
fn execution_failed_carries_code_and_retryable() {
let e = mk_event(Outcome::ExecutionFailed {
code: "FS_NOT_FOUND".into(),
retryable: false,
});
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
assert_eq!(j["outcome"]["kind"], "execution_failed");
assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
assert_eq!(j["outcome"]["retryable"], false);
}
#[test]
fn rate_limited_outcome_with_null_retry_after() {
let e = mk_event(Outcome::RateLimited {
retry_after_ms: None,
});
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
assert_eq!(j["outcome"]["kind"], "rate_limited");
assert!(j["outcome"]["retry_after_ms"].is_null());
}
#[test]
fn capability_provenance_roundtrips_both_sources() {
let mut e = mk_event(Outcome::Success);
e.capability_provenance = Some(vec![
CapProvenance {
cap: "records:read".into(),
source: ProvSource::StringAllowList,
},
CapProvenance {
cap: "records:write".into(),
source: ProvSource::UcanChain {
issuer_did: "did:key:zABC".into(),
chain_depth: 1,
},
},
]);
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
let prov = j["capability_provenance"].as_array().unwrap();
assert_eq!(prov[0]["cap"], "records:read");
assert_eq!(prov[0]["source"]["kind"], "string_allow_list");
assert_eq!(prov[1]["source"]["kind"], "ucan_chain");
assert_eq!(prov[1]["source"]["issuer_did"], "did:key:zABC");
assert_eq!(prov[1]["source"]["chain_depth"], 1);
}
#[test]
fn provenance_skipped_when_none() {
let e = mk_event(Outcome::Success);
let s = serde_json::to_string(&e).unwrap();
assert!(
!s.contains("capability_provenance"),
"None provenance must be omitted on the wire (back-compat), got: {s}"
);
}
#[test]
fn v2_event_without_provenance_deserializes_to_none() {
let j = r#"{"ts":"2026-05-29T00:00:00+00:00","call_id":"01J","tool_id":"x",
"granted_capabilities":[],"duration_ms":1,"outcome":{"kind":"success"},
"tier":"warm","dry_run":false,"schema_version":2,"secrets_resolved":false}"#;
let e: CallEvent = serde_json::from_str(j).unwrap();
assert!(e.capability_provenance.is_none());
assert!(e.cursor_page.is_none());
}
#[test]
fn caller_id_skipped_when_none() {
let mut e = mk_event(Outcome::Success);
e.caller_id = None;
let s = serde_json::to_string(&e).unwrap();
assert!(
!s.contains("caller_id"),
"caller_id None should be skipped, got: {}",
s
);
}
struct SharedBuf(Arc<Mutex<Vec<u8>>>);
impl Write for SharedBuf {
fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(bs);
Ok(bs.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
async fn wait_for_lines(
buf: &Arc<Mutex<Vec<u8>>>,
target_lines: usize,
timeout: std::time::Duration,
) -> Vec<u8> {
let deadline = std::time::Instant::now() + timeout;
loop {
{
let guard = buf.lock().unwrap();
let count = guard.iter().filter(|b| **b == b'\n').count();
if count >= target_lines || std::time::Instant::now() > deadline {
return guard.clone();
}
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
}
#[tokio::test]
async fn json_lines_sink_writes_one_line_per_event() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
sink.on_call(&mk_event(Outcome::Success));
sink.on_call(&mk_event(Outcome::ToolNotFound));
let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
let text = String::from_utf8(out).unwrap();
let lines: Vec<&str> = text.split_terminator('\n').collect();
assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
for line in &lines {
let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
}
}
#[tokio::test]
async fn on_call_is_non_blocking_under_burst() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
let ev = mk_event(Outcome::Success);
let started = std::time::Instant::now();
for _ in 0..100 {
sink.on_call(&ev);
}
let elapsed = started.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(50),
"100 on_call invocations took {elapsed:?}; expected <50ms"
);
}
#[test]
fn drops_counter_increments_when_channel_full() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new_with_capacity(
Box::new(SlowBuf {
inner: buf,
delay: std::time::Duration::from_millis(2),
}),
4,
);
let ev = mk_event(Outcome::Success);
for _ in 0..200 {
sink.on_call(&ev);
}
assert!(
sink.drops() > 0,
"expected drops at capacity=4 with a 200-event burst against a slow drain, got 0"
);
}
#[tokio::test]
async fn events_eventually_drain_to_writer() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
let ev = mk_event(Outcome::Success);
for _ in 0..10 {
sink.on_call(&ev);
}
let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
let text = String::from_utf8(out).unwrap();
let lines: Vec<&str> = text.split_terminator('\n').collect();
assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
}
#[tokio::test]
async fn dropping_sink_drains_pending_then_exits() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
{
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
for _ in 0..5 {
sink.on_call(&mk_event(Outcome::Success));
}
}
let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
let lines: Vec<&str> = std::str::from_utf8(&out)
.unwrap()
.split_terminator('\n')
.collect();
assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
}
struct SlowBuf {
inner: Arc<Mutex<Vec<u8>>>,
delay: std::time::Duration,
}
impl Write for SlowBuf {
fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
std::thread::sleep(self.delay);
self.inner.lock().unwrap().extend_from_slice(bs);
Ok(bs.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
#[test]
fn bare_sink_defaults_to_drop_strategy() {
struct Bare;
impl AuditSink for Bare {
fn on_call(&self, _: &CallEvent) {}
}
assert!(matches!(
Bare.backpressure_strategy(),
BackpressureStrategy::Drop
));
}
#[test]
fn with_strategy_block_reports_block() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::with_strategy(
Box::new(SharedBuf(buf)),
16,
BackpressureStrategy::Block,
);
assert!(matches!(
sink.backpressure_strategy(),
BackpressureStrategy::Block
));
}
#[test]
fn block_strategy_loses_nothing_under_burst() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::with_strategy(
Box::new(SlowBuf {
inner: buf.clone(),
delay: std::time::Duration::from_micros(50),
}),
4,
BackpressureStrategy::Block,
);
let ev = mk_event(Outcome::Success);
for _ in 0..100 {
sink.on_call(&ev);
}
assert_eq!(sink.drops(), 0, "Block strategy must never drop");
drop(sink); let n = buf.lock().unwrap().iter().filter(|b| **b == b'\n').count();
assert_eq!(
n, 100,
"Block must flush all 100 events by the time drop returns"
);
}
#[test]
fn fallback_strategy_routes_overflow_to_fallback() {
struct CountSink(Arc<AtomicU64>);
impl AuditSink for CountSink {
fn on_call(&self, _: &CallEvent) {
self.0.fetch_add(1, Ordering::Relaxed);
}
}
let fb_count = Arc::new(AtomicU64::new(0));
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::with_strategy(
Box::new(SlowBuf {
inner: buf,
delay: std::time::Duration::from_millis(5),
}),
1,
BackpressureStrategy::FallbackSink(Arc::new(CountSink(fb_count.clone()))),
);
let ev = mk_event(Outcome::Success);
for _ in 0..50 {
sink.on_call(&ev);
}
assert_eq!(sink.drops(), 0, "fallback caught overflow; primary drops 0");
assert!(
fb_count.load(Ordering::Relaxed) > 0,
"fallback sink must catch the overflow events"
);
}
#[test]
fn now_rfc3339_format_is_parseable() {
let s = now_rfc3339();
chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
}
}