use chrono::Utc;
use serde::{Deserialize, Serialize};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub const SCHEMA_VERSION: u32 = 2;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CallEvent {
pub ts: String,
pub call_id: String,
pub tool_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub caller_id: Option<String>,
pub granted_capabilities: Vec<String>,
pub duration_ms: u64,
pub outcome: Outcome,
pub tier: String,
pub dry_run: bool,
pub schema_version: u32,
#[serde(default)]
pub secrets_resolved: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cursor_page: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Outcome {
Success,
ExecutionFailed { code: String, retryable: bool },
InvalidArgs { message: String },
CapabilityDenied { missing: Vec<String> },
RateLimited { retry_after_ms: Option<u64> },
ToolNotFound,
}
pub trait AuditSink: Send + Sync {
fn on_call(&self, event: &CallEvent);
fn drops(&self) -> u64 {
0
}
}
pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
pub struct JsonLinesAuditSink {
tx: tokio::sync::mpsc::Sender<CallEvent>,
drops: Arc<AtomicU64>,
}
impl JsonLinesAuditSink {
pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
}
pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::channel::<CallEvent>(capacity);
let drops = Arc::new(AtomicU64::new(0));
let mut writer = writer;
tokio::spawn(async move {
while let Some(ev) = rx.recv().await {
if let Ok(mut line) = serde_json::to_vec(&ev) {
line.push(b'\n');
let _ = writer.write_all(&line);
let _ = writer.flush();
}
}
let _ = writer.flush();
});
Self { tx, drops }
}
pub fn stdout() -> Self {
Self::new(Box::new(std::io::stdout()))
}
pub fn stderr() -> Self {
Self::new(Box::new(std::io::stderr()))
}
pub fn file(path: &Path) -> std::io::Result<Self> {
let f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(Self::new(Box::new(f)))
}
pub fn drops(&self) -> u64 {
self.drops.load(Ordering::Relaxed)
}
}
impl AuditSink for JsonLinesAuditSink {
fn on_call(&self, event: &CallEvent) {
match self.tx.try_send(event.clone()) {
Ok(()) => {}
Err(_) => {
self.drops.fetch_add(1, Ordering::Relaxed);
}
}
}
fn drops(&self) -> u64 {
self.drops.load(Ordering::Relaxed)
}
}
pub fn now_rfc3339() -> String {
Utc::now().to_rfc3339()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
fn mk_event(outcome: Outcome) -> CallEvent {
CallEvent {
ts: now_rfc3339(),
call_id: "01J000000000000000000000TEST".into(),
tool_id: "ref:echo.say".into(),
caller_id: Some("test-client".into()),
granted_capabilities: vec!["read".into(), "write".into()],
duration_ms: 17,
outcome,
tier: "warm".into(),
dry_run: false,
schema_version: SCHEMA_VERSION,
secrets_resolved: false,
cursor_page: None,
}
}
#[test]
fn success_event_serializes() {
let e = mk_event(Outcome::Success);
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
assert_eq!(j["tool_id"], "ref:echo.say");
assert_eq!(j["outcome"]["kind"], "success");
assert_eq!(j["schema_version"], 2);
assert_eq!(j["dry_run"], false);
}
#[test]
fn capability_denied_outcome_tagged_correctly() {
let e = mk_event(Outcome::CapabilityDenied {
missing: vec!["conformance.denied".into()],
});
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
assert_eq!(j["outcome"]["kind"], "capability_denied");
assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
}
#[test]
fn execution_failed_carries_code_and_retryable() {
let e = mk_event(Outcome::ExecutionFailed {
code: "FS_NOT_FOUND".into(),
retryable: false,
});
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
assert_eq!(j["outcome"]["kind"], "execution_failed");
assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
assert_eq!(j["outcome"]["retryable"], false);
}
#[test]
fn rate_limited_outcome_with_null_retry_after() {
let e = mk_event(Outcome::RateLimited {
retry_after_ms: None,
});
let j: serde_json::Value =
serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
assert_eq!(j["outcome"]["kind"], "rate_limited");
assert!(j["outcome"]["retry_after_ms"].is_null());
}
#[test]
fn caller_id_skipped_when_none() {
let mut e = mk_event(Outcome::Success);
e.caller_id = None;
let s = serde_json::to_string(&e).unwrap();
assert!(
!s.contains("caller_id"),
"caller_id None should be skipped, got: {}",
s
);
}
struct SharedBuf(Arc<Mutex<Vec<u8>>>);
impl Write for SharedBuf {
fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(bs);
Ok(bs.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
async fn wait_for_lines(
buf: &Arc<Mutex<Vec<u8>>>,
target_lines: usize,
timeout: std::time::Duration,
) -> Vec<u8> {
let deadline = std::time::Instant::now() + timeout;
loop {
{
let guard = buf.lock().unwrap();
let count = guard.iter().filter(|b| **b == b'\n').count();
if count >= target_lines || std::time::Instant::now() > deadline {
return guard.clone();
}
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
}
#[tokio::test]
async fn json_lines_sink_writes_one_line_per_event() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
sink.on_call(&mk_event(Outcome::Success));
sink.on_call(&mk_event(Outcome::ToolNotFound));
let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
let text = String::from_utf8(out).unwrap();
let lines: Vec<&str> = text.split_terminator('\n').collect();
assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
for line in &lines {
let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
}
}
#[tokio::test]
async fn on_call_is_non_blocking_under_burst() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
let ev = mk_event(Outcome::Success);
let started = std::time::Instant::now();
for _ in 0..100 {
sink.on_call(&ev);
}
let elapsed = started.elapsed();
assert!(
elapsed < std::time::Duration::from_millis(50),
"100 on_call invocations took {elapsed:?}; expected <50ms"
);
}
#[tokio::test]
async fn drops_counter_increments_when_channel_full() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new_with_capacity(Box::new(SharedBuf(buf)), 4);
let ev = mk_event(Outcome::Success);
for _ in 0..200 {
sink.on_call(&ev);
}
let dropped = sink.drops();
assert!(
dropped > 0,
"expected some drops at capacity=4 with 200-event burst, got 0"
);
}
#[tokio::test]
async fn events_eventually_drain_to_writer() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
let ev = mk_event(Outcome::Success);
for _ in 0..10 {
sink.on_call(&ev);
}
let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
let text = String::from_utf8(out).unwrap();
let lines: Vec<&str> = text.split_terminator('\n').collect();
assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
}
#[tokio::test]
async fn dropping_sink_drains_pending_then_exits() {
let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
{
let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
for _ in 0..5 {
sink.on_call(&mk_event(Outcome::Success));
}
}
let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
let lines: Vec<&str> = std::str::from_utf8(&out)
.unwrap()
.split_terminator('\n')
.collect();
assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
}
#[test]
fn now_rfc3339_format_is_parseable() {
let s = now_rfc3339();
chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
}
}