1use chrono::Utc;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::Path;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26pub const SCHEMA_VERSION: u32 = 3;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct CallEvent {
48 pub ts: String,
49 pub call_id: String,
50 pub tool_id: String,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub caller_id: Option<String>,
53 pub granted_capabilities: Vec<String>,
54 pub duration_ms: u64,
55 pub outcome: Outcome,
56 pub tier: String,
57 pub dry_run: bool,
58 pub schema_version: u32,
59 #[serde(default)]
65 pub secrets_resolved: bool,
66 #[serde(default, skip_serializing_if = "Option::is_none")]
71 pub cursor_page: Option<u32>,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub capability_provenance: Option<Vec<CapProvenance>>,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub struct CapProvenance {
87 pub cap: String,
88 pub source: ProvSource,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
94#[serde(tag = "kind", rename_all = "snake_case")]
95pub enum ProvSource {
96 StringAllowList,
99 UcanChain { issuer_did: String, chain_depth: u8 },
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(tag = "kind", rename_all = "snake_case")]
107pub enum Outcome {
108 Success,
109 ExecutionFailed { code: String, retryable: bool },
110 InvalidArgs { message: String },
111 CapabilityDenied { missing: Vec<String> },
112 RateLimited { retry_after_ms: Option<u64> },
113 ToolNotFound,
114}
115
116#[derive(Clone)]
119pub enum BackpressureStrategy {
120 Drop,
124 Block,
130 FallbackSink(Arc<dyn AuditSink>),
136}
137
138impl std::fmt::Debug for BackpressureStrategy {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 match self {
141 BackpressureStrategy::Drop => f.write_str("Drop"),
142 BackpressureStrategy::Block => f.write_str("Block"),
143 BackpressureStrategy::FallbackSink(_) => f.write_str("FallbackSink(..)"),
144 }
145 }
146}
147
148pub trait AuditSink: Send + Sync {
152 fn on_call(&self, event: &CallEvent);
153 fn drops(&self) -> u64 {
160 0
161 }
162 fn backpressure_strategy(&self) -> BackpressureStrategy {
167 BackpressureStrategy::Drop
168 }
169}
170
171pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
175
176pub struct JsonLinesAuditSink {
192 tx: Option<std::sync::mpsc::SyncSender<CallEvent>>,
195 drain: Option<std::thread::JoinHandle<()>>,
198 drops: Arc<AtomicU64>,
199 strategy: BackpressureStrategy,
200}
201
202impl JsonLinesAuditSink {
203 pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
206 Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
207 }
208
209 pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
211 Self::with_strategy(writer, capacity, BackpressureStrategy::Drop)
212 }
213
214 pub fn with_strategy(
218 writer: Box<dyn Write + Send + 'static>,
219 capacity: usize,
220 strategy: BackpressureStrategy,
221 ) -> Self {
222 if matches!(strategy, BackpressureStrategy::Block) {
227 if let Ok(h) = tokio::runtime::Handle::try_current() {
228 if h.runtime_flavor() == tokio::runtime::RuntimeFlavor::CurrentThread {
229 eprintln!(
230 "atd: WARNING — JsonLinesAuditSink Block strategy on a \
231 current_thread runtime; a blocked worker can stall accept \
232 under audit backpressure. Prefer a multi-thread runtime."
233 );
234 }
235 }
236 }
237 let (tx, rx) = std::sync::mpsc::sync_channel::<CallEvent>(capacity);
238 let drops = Arc::new(AtomicU64::new(0));
239 let mut writer = writer;
240 let drain = std::thread::spawn(move || {
241 while let Ok(ev) = rx.recv() {
242 if let Ok(mut line) = serde_json::to_vec(&ev) {
243 line.push(b'\n');
244 let _ = writer.write_all(&line);
245 let _ = writer.flush();
246 }
247 }
248 let _ = writer.flush();
250 });
251 Self {
252 tx: Some(tx),
253 drain: Some(drain),
254 drops,
255 strategy,
256 }
257 }
258
259 pub fn stdout() -> Self {
260 Self::new(Box::new(std::io::stdout()))
261 }
262
263 pub fn stderr() -> Self {
264 Self::new(Box::new(std::io::stderr()))
265 }
266
267 pub fn file(path: &Path) -> std::io::Result<Self> {
269 let f = std::fs::OpenOptions::new()
270 .create(true)
271 .append(true)
272 .open(path)?;
273 Ok(Self::new(Box::new(f)))
274 }
275
276 pub fn drops(&self) -> u64 {
279 self.drops.load(Ordering::Relaxed)
280 }
281}
282
283impl AuditSink for JsonLinesAuditSink {
284 fn on_call(&self, event: &CallEvent) {
285 let Some(tx) = self.tx.as_ref() else {
286 self.drops.fetch_add(1, Ordering::Relaxed);
288 return;
289 };
290 match &self.strategy {
291 BackpressureStrategy::Drop => {
292 if tx.try_send(event.clone()).is_err() {
294 self.drops.fetch_add(1, Ordering::Relaxed);
295 }
296 }
297 BackpressureStrategy::Block => {
298 if tx.send(event.clone()).is_err() {
302 self.drops.fetch_add(1, Ordering::Relaxed);
303 }
304 }
305 BackpressureStrategy::FallbackSink(fb) => {
306 if tx.try_send(event.clone()).is_err() {
307 fb.on_call(event);
308 }
309 }
310 }
311 }
312 fn drops(&self) -> u64 {
313 self.drops.load(Ordering::Relaxed)
314 }
315 fn backpressure_strategy(&self) -> BackpressureStrategy {
316 self.strategy.clone()
317 }
318}
319
320impl Drop for JsonLinesAuditSink {
321 fn drop(&mut self) {
328 self.tx.take(); if let Some(h) = self.drain.take() {
330 let _ = h.join();
331 }
332 }
333}
334
335pub fn now_rfc3339() -> String {
339 Utc::now().to_rfc3339()
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use std::sync::Mutex;
346
347 fn mk_event(outcome: Outcome) -> CallEvent {
348 CallEvent {
349 ts: now_rfc3339(),
350 call_id: "01J000000000000000000000TEST".into(),
351 tool_id: "ref:echo.say".into(),
352 caller_id: Some("test-client".into()),
353 granted_capabilities: vec!["read".into(), "write".into()],
354 duration_ms: 17,
355 outcome,
356 tier: "warm".into(),
357 dry_run: false,
358 schema_version: SCHEMA_VERSION,
359 secrets_resolved: false,
360 cursor_page: None,
361 capability_provenance: None,
362 }
363 }
364
365 #[test]
366 fn success_event_serializes() {
367 let e = mk_event(Outcome::Success);
368 let j: serde_json::Value =
369 serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
370 assert_eq!(j["tool_id"], "ref:echo.say");
371 assert_eq!(j["outcome"]["kind"], "success");
372 assert_eq!(j["schema_version"], 3);
373 assert_eq!(j["dry_run"], false);
374 }
375
376 #[test]
377 fn capability_denied_outcome_tagged_correctly() {
378 let e = mk_event(Outcome::CapabilityDenied {
379 missing: vec!["conformance.denied".into()],
380 });
381 let j: serde_json::Value =
382 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
383 assert_eq!(j["outcome"]["kind"], "capability_denied");
384 assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
385 }
386
387 #[test]
388 fn execution_failed_carries_code_and_retryable() {
389 let e = mk_event(Outcome::ExecutionFailed {
390 code: "FS_NOT_FOUND".into(),
391 retryable: false,
392 });
393 let j: serde_json::Value =
394 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
395 assert_eq!(j["outcome"]["kind"], "execution_failed");
396 assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
397 assert_eq!(j["outcome"]["retryable"], false);
398 }
399
400 #[test]
401 fn rate_limited_outcome_with_null_retry_after() {
402 let e = mk_event(Outcome::RateLimited {
403 retry_after_ms: None,
404 });
405 let j: serde_json::Value =
406 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
407 assert_eq!(j["outcome"]["kind"], "rate_limited");
408 assert!(j["outcome"]["retry_after_ms"].is_null());
409 }
410
411 #[test]
414 fn capability_provenance_roundtrips_both_sources() {
415 let mut e = mk_event(Outcome::Success);
416 e.capability_provenance = Some(vec![
417 CapProvenance {
418 cap: "records:read".into(),
419 source: ProvSource::StringAllowList,
420 },
421 CapProvenance {
422 cap: "records:write".into(),
423 source: ProvSource::UcanChain {
424 issuer_did: "did:key:zABC".into(),
425 chain_depth: 1,
426 },
427 },
428 ]);
429 let j: serde_json::Value =
430 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
431 let prov = j["capability_provenance"].as_array().unwrap();
432 assert_eq!(prov[0]["cap"], "records:read");
433 assert_eq!(prov[0]["source"]["kind"], "string_allow_list");
434 assert_eq!(prov[1]["source"]["kind"], "ucan_chain");
435 assert_eq!(prov[1]["source"]["issuer_did"], "did:key:zABC");
436 assert_eq!(prov[1]["source"]["chain_depth"], 1);
437 }
438
439 #[test]
440 fn provenance_skipped_when_none() {
441 let e = mk_event(Outcome::Success);
442 let s = serde_json::to_string(&e).unwrap();
443 assert!(
444 !s.contains("capability_provenance"),
445 "None provenance must be omitted on the wire (back-compat), got: {s}"
446 );
447 }
448
449 #[test]
450 fn v2_event_without_provenance_deserializes_to_none() {
451 let j = r#"{"ts":"2026-05-29T00:00:00+00:00","call_id":"01J","tool_id":"x",
454 "granted_capabilities":[],"duration_ms":1,"outcome":{"kind":"success"},
455 "tier":"warm","dry_run":false,"schema_version":2,"secrets_resolved":false}"#;
456 let e: CallEvent = serde_json::from_str(j).unwrap();
457 assert!(e.capability_provenance.is_none());
458 assert!(e.cursor_page.is_none());
459 }
460
461 #[test]
462 fn caller_id_skipped_when_none() {
463 let mut e = mk_event(Outcome::Success);
464 e.caller_id = None;
465 let s = serde_json::to_string(&e).unwrap();
466 assert!(
467 !s.contains("caller_id"),
468 "caller_id None should be skipped, got: {}",
469 s
470 );
471 }
472
473 struct SharedBuf(Arc<Mutex<Vec<u8>>>);
478 impl Write for SharedBuf {
479 fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
480 self.0.lock().unwrap().extend_from_slice(bs);
481 Ok(bs.len())
482 }
483 fn flush(&mut self) -> std::io::Result<()> {
484 Ok(())
485 }
486 }
487
488 async fn wait_for_lines(
491 buf: &Arc<Mutex<Vec<u8>>>,
492 target_lines: usize,
493 timeout: std::time::Duration,
494 ) -> Vec<u8> {
495 let deadline = std::time::Instant::now() + timeout;
496 loop {
497 {
498 let guard = buf.lock().unwrap();
499 let count = guard.iter().filter(|b| **b == b'\n').count();
500 if count >= target_lines || std::time::Instant::now() > deadline {
501 return guard.clone();
502 }
503 }
504 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
505 }
506 }
507
508 #[tokio::test]
509 async fn json_lines_sink_writes_one_line_per_event() {
510 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
511 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
512 sink.on_call(&mk_event(Outcome::Success));
513 sink.on_call(&mk_event(Outcome::ToolNotFound));
514
515 let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
516 let text = String::from_utf8(out).unwrap();
517 let lines: Vec<&str> = text.split_terminator('\n').collect();
518 assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
519 for line in &lines {
520 let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
521 }
522 }
523
524 #[tokio::test]
527 async fn on_call_is_non_blocking_under_burst() {
528 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
529 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
530 let ev = mk_event(Outcome::Success);
531 let started = std::time::Instant::now();
535 for _ in 0..100 {
536 sink.on_call(&ev);
537 }
538 let elapsed = started.elapsed();
539 assert!(
540 elapsed < std::time::Duration::from_millis(50),
541 "100 on_call invocations took {elapsed:?}; expected <50ms"
542 );
543 }
544
545 #[test]
546 fn drops_counter_increments_when_channel_full() {
547 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
552 let sink = JsonLinesAuditSink::new_with_capacity(
553 Box::new(SlowBuf {
554 inner: buf,
555 delay: std::time::Duration::from_millis(2),
556 }),
557 4,
558 );
559 let ev = mk_event(Outcome::Success);
560 for _ in 0..200 {
561 sink.on_call(&ev);
562 }
563 assert!(
564 sink.drops() > 0,
565 "expected drops at capacity=4 with a 200-event burst against a slow drain, got 0"
566 );
567 }
568
569 #[tokio::test]
570 async fn events_eventually_drain_to_writer() {
571 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
572 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
573 let ev = mk_event(Outcome::Success);
574 for _ in 0..10 {
575 sink.on_call(&ev);
576 }
577 let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
578 let text = String::from_utf8(out).unwrap();
579 let lines: Vec<&str> = text.split_terminator('\n').collect();
580 assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
581 }
582
583 #[tokio::test]
584 async fn dropping_sink_drains_pending_then_exits() {
585 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
586 {
587 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
588 for _ in 0..5 {
589 sink.on_call(&mk_event(Outcome::Success));
590 }
591 }
593 let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
595 let lines: Vec<&str> = std::str::from_utf8(&out)
596 .unwrap()
597 .split_terminator('\n')
598 .collect();
599 assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
600 }
601
602 struct SlowBuf {
607 inner: Arc<Mutex<Vec<u8>>>,
608 delay: std::time::Duration,
609 }
610 impl Write for SlowBuf {
611 fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
612 std::thread::sleep(self.delay);
613 self.inner.lock().unwrap().extend_from_slice(bs);
614 Ok(bs.len())
615 }
616 fn flush(&mut self) -> std::io::Result<()> {
617 Ok(())
618 }
619 }
620
621 #[test]
622 fn bare_sink_defaults_to_drop_strategy() {
623 struct Bare;
624 impl AuditSink for Bare {
625 fn on_call(&self, _: &CallEvent) {}
626 }
627 assert!(matches!(
628 Bare.backpressure_strategy(),
629 BackpressureStrategy::Drop
630 ));
631 }
632
633 #[test]
634 fn with_strategy_block_reports_block() {
635 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
636 let sink = JsonLinesAuditSink::with_strategy(
637 Box::new(SharedBuf(buf)),
638 16,
639 BackpressureStrategy::Block,
640 );
641 assert!(matches!(
642 sink.backpressure_strategy(),
643 BackpressureStrategy::Block
644 ));
645 }
646
647 #[test]
648 fn block_strategy_loses_nothing_under_burst() {
649 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
655 let sink = JsonLinesAuditSink::with_strategy(
656 Box::new(SlowBuf {
657 inner: buf.clone(),
658 delay: std::time::Duration::from_micros(50),
659 }),
660 4,
661 BackpressureStrategy::Block,
662 );
663 let ev = mk_event(Outcome::Success);
664 for _ in 0..100 {
665 sink.on_call(&ev);
666 }
667 assert_eq!(sink.drops(), 0, "Block strategy must never drop");
668 drop(sink); let n = buf.lock().unwrap().iter().filter(|b| **b == b'\n').count();
670 assert_eq!(
671 n, 100,
672 "Block must flush all 100 events by the time drop returns"
673 );
674 }
675
676 #[test]
677 fn fallback_strategy_routes_overflow_to_fallback() {
678 struct CountSink(Arc<AtomicU64>);
679 impl AuditSink for CountSink {
680 fn on_call(&self, _: &CallEvent) {
681 self.0.fetch_add(1, Ordering::Relaxed);
682 }
683 }
684 let fb_count = Arc::new(AtomicU64::new(0));
685 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
686 let sink = JsonLinesAuditSink::with_strategy(
687 Box::new(SlowBuf {
688 inner: buf,
689 delay: std::time::Duration::from_millis(5),
690 }),
691 1,
692 BackpressureStrategy::FallbackSink(Arc::new(CountSink(fb_count.clone()))),
693 );
694 let ev = mk_event(Outcome::Success);
695 for _ in 0..50 {
696 sink.on_call(&ev);
697 }
698 assert_eq!(sink.drops(), 0, "fallback caught overflow; primary drops 0");
699 assert!(
700 fb_count.load(Ordering::Relaxed) > 0,
701 "fallback sink must catch the overflow events"
702 );
703 }
704
705 #[test]
706 fn now_rfc3339_format_is_parseable() {
707 let s = now_rfc3339();
708 chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
709 }
710}