1use chrono::Utc;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::Path;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26pub const SCHEMA_VERSION: u32 = 3;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
47#[non_exhaustive]
48pub struct CallEvent {
49 pub ts: String,
50 pub call_id: String,
51 pub tool_id: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub caller_id: Option<String>,
54 pub granted_capabilities: Vec<String>,
55 pub duration_ms: u64,
56 pub outcome: Outcome,
57 pub tier: String,
58 pub dry_run: bool,
59 pub schema_version: u32,
60 #[serde(default)]
66 pub secrets_resolved: bool,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
72 pub cursor_page: Option<u32>,
73 #[serde(default, skip_serializing_if = "Option::is_none")]
80 pub capability_provenance: Option<Vec<CapProvenance>>,
81}
82
83impl CallEvent {
84 pub fn new(
95 ts: impl Into<String>,
96 call_id: impl Into<String>,
97 tool_id: impl Into<String>,
98 duration_ms: u64,
99 outcome: Outcome,
100 tier: impl Into<String>,
101 ) -> Self {
102 Self {
103 ts: ts.into(),
104 call_id: call_id.into(),
105 tool_id: tool_id.into(),
106 caller_id: None,
107 granted_capabilities: Vec::new(),
108 duration_ms,
109 outcome,
110 tier: tier.into(),
111 dry_run: false,
112 schema_version: SCHEMA_VERSION,
113 secrets_resolved: false,
114 cursor_page: None,
115 capability_provenance: None,
116 }
117 }
118
119 pub fn with_caller_id(mut self, caller_id: Option<String>) -> Self {
120 self.caller_id = caller_id;
121 self
122 }
123 pub fn with_granted_capabilities(mut self, caps: Vec<String>) -> Self {
124 self.granted_capabilities = caps;
125 self
126 }
127 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
128 self.dry_run = dry_run;
129 self
130 }
131 pub fn with_secrets_resolved(mut self, resolved: bool) -> Self {
132 self.secrets_resolved = resolved;
133 self
134 }
135 pub fn with_cursor_page(mut self, page: Option<u32>) -> Self {
136 self.cursor_page = page;
137 self
138 }
139 pub fn with_capability_provenance(mut self, provenance: Option<Vec<CapProvenance>>) -> Self {
140 self.capability_provenance = provenance;
141 self
142 }
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
149pub struct CapProvenance {
150 pub cap: String,
151 pub source: ProvSource,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(tag = "kind", rename_all = "snake_case")]
158pub enum ProvSource {
159 StringAllowList,
162 UcanChain { issuer_did: String, chain_depth: u8 },
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(tag = "kind", rename_all = "snake_case")]
170pub enum Outcome {
171 Success,
172 ExecutionFailed { code: String, retryable: bool },
173 InvalidArgs { message: String },
174 CapabilityDenied { missing: Vec<String> },
175 RateLimited { retry_after_ms: Option<u64> },
176 ToolNotFound,
177}
178
179#[derive(Clone)]
182pub enum BackpressureStrategy {
183 Drop,
187 Block,
193 FallbackSink(Arc<dyn AuditSink>),
199}
200
201impl std::fmt::Debug for BackpressureStrategy {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 match self {
204 BackpressureStrategy::Drop => f.write_str("Drop"),
205 BackpressureStrategy::Block => f.write_str("Block"),
206 BackpressureStrategy::FallbackSink(_) => f.write_str("FallbackSink(..)"),
207 }
208 }
209}
210
211pub trait AuditSink: Send + Sync {
215 fn on_call(&self, event: &CallEvent);
216 fn drops(&self) -> u64 {
223 0
224 }
225 fn backpressure_strategy(&self) -> BackpressureStrategy {
230 BackpressureStrategy::Drop
231 }
232}
233
234pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
238
239pub struct JsonLinesAuditSink {
255 tx: Option<std::sync::mpsc::SyncSender<CallEvent>>,
258 drain: Option<std::thread::JoinHandle<()>>,
261 drops: Arc<AtomicU64>,
262 strategy: BackpressureStrategy,
263}
264
265impl JsonLinesAuditSink {
266 pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
269 Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
270 }
271
272 pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
274 Self::with_strategy(writer, capacity, BackpressureStrategy::Drop)
275 }
276
277 pub fn with_strategy(
281 writer: Box<dyn Write + Send + 'static>,
282 capacity: usize,
283 strategy: BackpressureStrategy,
284 ) -> Self {
285 if matches!(strategy, BackpressureStrategy::Block) {
290 if let Ok(h) = tokio::runtime::Handle::try_current() {
291 if h.runtime_flavor() == tokio::runtime::RuntimeFlavor::CurrentThread {
292 eprintln!(
293 "atd: WARNING — JsonLinesAuditSink Block strategy on a \
294 current_thread runtime; a blocked worker can stall accept \
295 under audit backpressure. Prefer a multi-thread runtime."
296 );
297 }
298 }
299 }
300 let (tx, rx) = std::sync::mpsc::sync_channel::<CallEvent>(capacity);
301 let drops = Arc::new(AtomicU64::new(0));
302 let mut writer = writer;
303 let drain = std::thread::spawn(move || {
304 while let Ok(ev) = rx.recv() {
305 if let Ok(mut line) = serde_json::to_vec(&ev) {
306 line.push(b'\n');
307 let _ = writer.write_all(&line);
308 let _ = writer.flush();
309 }
310 }
311 let _ = writer.flush();
313 });
314 Self {
315 tx: Some(tx),
316 drain: Some(drain),
317 drops,
318 strategy,
319 }
320 }
321
322 pub fn stdout() -> Self {
323 Self::new(Box::new(std::io::stdout()))
324 }
325
326 pub fn stderr() -> Self {
327 Self::new(Box::new(std::io::stderr()))
328 }
329
330 pub fn file(path: &Path) -> std::io::Result<Self> {
332 let f = std::fs::OpenOptions::new()
333 .create(true)
334 .append(true)
335 .open(path)?;
336 Ok(Self::new(Box::new(f)))
337 }
338
339 pub fn drops(&self) -> u64 {
342 self.drops.load(Ordering::Relaxed)
343 }
344}
345
346impl AuditSink for JsonLinesAuditSink {
347 fn on_call(&self, event: &CallEvent) {
348 let Some(tx) = self.tx.as_ref() else {
349 self.drops.fetch_add(1, Ordering::Relaxed);
351 return;
352 };
353 match &self.strategy {
354 BackpressureStrategy::Drop => {
355 if tx.try_send(event.clone()).is_err() {
357 self.drops.fetch_add(1, Ordering::Relaxed);
358 }
359 }
360 BackpressureStrategy::Block => {
361 if tx.send(event.clone()).is_err() {
365 self.drops.fetch_add(1, Ordering::Relaxed);
366 }
367 }
368 BackpressureStrategy::FallbackSink(fb) => {
369 if tx.try_send(event.clone()).is_err() {
370 fb.on_call(event);
371 }
372 }
373 }
374 }
375 fn drops(&self) -> u64 {
376 self.drops.load(Ordering::Relaxed)
377 }
378 fn backpressure_strategy(&self) -> BackpressureStrategy {
379 self.strategy.clone()
380 }
381}
382
383impl Drop for JsonLinesAuditSink {
384 fn drop(&mut self) {
391 self.tx.take(); if let Some(h) = self.drain.take() {
393 let _ = h.join();
394 }
395 }
396}
397
398pub fn now_rfc3339() -> String {
402 Utc::now().to_rfc3339()
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::sync::Mutex;
409
410 fn mk_event(outcome: Outcome) -> CallEvent {
411 CallEvent::new(
412 now_rfc3339(),
413 "01J000000000000000000000TEST",
414 "ref:echo.say",
415 17,
416 outcome,
417 "warm",
418 )
419 .with_caller_id(Some("test-client".into()))
420 .with_granted_capabilities(vec!["read".into(), "write".into()])
421 }
422
423 #[test]
424 fn callevent_builder_defaults_then_setters() {
425 let e = CallEvent::new(now_rfc3339(), "cid", "tool:x", 5, Outcome::Success, "warm");
426 assert_eq!(e.tool_id, "tool:x");
427 assert_eq!(e.duration_ms, 5);
428 assert_eq!(e.schema_version, SCHEMA_VERSION);
429 assert!(e.caller_id.is_none());
430 assert!(e.granted_capabilities.is_empty());
431 assert!(!e.dry_run);
432 assert!(!e.secrets_resolved);
433 assert!(e.cursor_page.is_none());
434 assert!(e.capability_provenance.is_none());
435 let e2 = e
436 .with_caller_id(Some("agent-A".into()))
437 .with_cursor_page(Some(2))
438 .with_secrets_resolved(true);
439 assert_eq!(e2.caller_id.as_deref(), Some("agent-A"));
440 assert_eq!(e2.cursor_page, Some(2));
441 assert!(e2.secrets_resolved);
442 }
443
444 #[test]
445 fn success_event_serializes() {
446 let e = mk_event(Outcome::Success);
447 let j: serde_json::Value =
448 serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
449 assert_eq!(j["tool_id"], "ref:echo.say");
450 assert_eq!(j["outcome"]["kind"], "success");
451 assert_eq!(j["schema_version"], 3);
452 assert_eq!(j["dry_run"], false);
453 }
454
455 #[test]
456 fn capability_denied_outcome_tagged_correctly() {
457 let e = mk_event(Outcome::CapabilityDenied {
458 missing: vec!["conformance.denied".into()],
459 });
460 let j: serde_json::Value =
461 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
462 assert_eq!(j["outcome"]["kind"], "capability_denied");
463 assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
464 }
465
466 #[test]
467 fn execution_failed_carries_code_and_retryable() {
468 let e = mk_event(Outcome::ExecutionFailed {
469 code: "FS_NOT_FOUND".into(),
470 retryable: false,
471 });
472 let j: serde_json::Value =
473 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
474 assert_eq!(j["outcome"]["kind"], "execution_failed");
475 assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
476 assert_eq!(j["outcome"]["retryable"], false);
477 }
478
479 #[test]
480 fn rate_limited_outcome_with_null_retry_after() {
481 let e = mk_event(Outcome::RateLimited {
482 retry_after_ms: None,
483 });
484 let j: serde_json::Value =
485 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
486 assert_eq!(j["outcome"]["kind"], "rate_limited");
487 assert!(j["outcome"]["retry_after_ms"].is_null());
488 }
489
490 #[test]
493 fn capability_provenance_roundtrips_both_sources() {
494 let mut e = mk_event(Outcome::Success);
495 e.capability_provenance = Some(vec![
496 CapProvenance {
497 cap: "records:read".into(),
498 source: ProvSource::StringAllowList,
499 },
500 CapProvenance {
501 cap: "records:write".into(),
502 source: ProvSource::UcanChain {
503 issuer_did: "did:key:zABC".into(),
504 chain_depth: 1,
505 },
506 },
507 ]);
508 let j: serde_json::Value =
509 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
510 let prov = j["capability_provenance"].as_array().unwrap();
511 assert_eq!(prov[0]["cap"], "records:read");
512 assert_eq!(prov[0]["source"]["kind"], "string_allow_list");
513 assert_eq!(prov[1]["source"]["kind"], "ucan_chain");
514 assert_eq!(prov[1]["source"]["issuer_did"], "did:key:zABC");
515 assert_eq!(prov[1]["source"]["chain_depth"], 1);
516 }
517
518 #[test]
519 fn provenance_skipped_when_none() {
520 let e = mk_event(Outcome::Success);
521 let s = serde_json::to_string(&e).unwrap();
522 assert!(
523 !s.contains("capability_provenance"),
524 "None provenance must be omitted on the wire (back-compat), got: {s}"
525 );
526 }
527
528 #[test]
529 fn v2_event_without_provenance_deserializes_to_none() {
530 let j = r#"{"ts":"2026-05-29T00:00:00+00:00","call_id":"01J","tool_id":"x",
533 "granted_capabilities":[],"duration_ms":1,"outcome":{"kind":"success"},
534 "tier":"warm","dry_run":false,"schema_version":2,"secrets_resolved":false}"#;
535 let e: CallEvent = serde_json::from_str(j).unwrap();
536 assert!(e.capability_provenance.is_none());
537 assert!(e.cursor_page.is_none());
538 }
539
540 #[test]
541 fn caller_id_skipped_when_none() {
542 let mut e = mk_event(Outcome::Success);
543 e.caller_id = None;
544 let s = serde_json::to_string(&e).unwrap();
545 assert!(
546 !s.contains("caller_id"),
547 "caller_id None should be skipped, got: {}",
548 s
549 );
550 }
551
552 struct SharedBuf(Arc<Mutex<Vec<u8>>>);
557 impl Write for SharedBuf {
558 fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
559 self.0.lock().unwrap().extend_from_slice(bs);
560 Ok(bs.len())
561 }
562 fn flush(&mut self) -> std::io::Result<()> {
563 Ok(())
564 }
565 }
566
567 async fn wait_for_lines(
570 buf: &Arc<Mutex<Vec<u8>>>,
571 target_lines: usize,
572 timeout: std::time::Duration,
573 ) -> Vec<u8> {
574 let deadline = std::time::Instant::now() + timeout;
575 loop {
576 {
577 let guard = buf.lock().unwrap();
578 let count = guard.iter().filter(|b| **b == b'\n').count();
579 if count >= target_lines || std::time::Instant::now() > deadline {
580 return guard.clone();
581 }
582 }
583 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
584 }
585 }
586
587 #[tokio::test]
588 async fn json_lines_sink_writes_one_line_per_event() {
589 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
590 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
591 sink.on_call(&mk_event(Outcome::Success));
592 sink.on_call(&mk_event(Outcome::ToolNotFound));
593
594 let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
595 let text = String::from_utf8(out).unwrap();
596 let lines: Vec<&str> = text.split_terminator('\n').collect();
597 assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
598 for line in &lines {
599 let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
600 }
601 }
602
603 #[tokio::test]
606 async fn on_call_is_non_blocking_under_burst() {
607 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
608 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
609 let ev = mk_event(Outcome::Success);
610 let started = std::time::Instant::now();
614 for _ in 0..100 {
615 sink.on_call(&ev);
616 }
617 let elapsed = started.elapsed();
618 assert!(
619 elapsed < std::time::Duration::from_millis(50),
620 "100 on_call invocations took {elapsed:?}; expected <50ms"
621 );
622 }
623
624 #[test]
625 fn drops_counter_increments_when_channel_full() {
626 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
631 let sink = JsonLinesAuditSink::new_with_capacity(
632 Box::new(SlowBuf {
633 inner: buf,
634 delay: std::time::Duration::from_millis(2),
635 }),
636 4,
637 );
638 let ev = mk_event(Outcome::Success);
639 for _ in 0..200 {
640 sink.on_call(&ev);
641 }
642 assert!(
643 sink.drops() > 0,
644 "expected drops at capacity=4 with a 200-event burst against a slow drain, got 0"
645 );
646 }
647
648 #[tokio::test]
649 async fn events_eventually_drain_to_writer() {
650 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
651 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
652 let ev = mk_event(Outcome::Success);
653 for _ in 0..10 {
654 sink.on_call(&ev);
655 }
656 let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
657 let text = String::from_utf8(out).unwrap();
658 let lines: Vec<&str> = text.split_terminator('\n').collect();
659 assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
660 }
661
662 #[tokio::test]
663 async fn dropping_sink_drains_pending_then_exits() {
664 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
665 {
666 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
667 for _ in 0..5 {
668 sink.on_call(&mk_event(Outcome::Success));
669 }
670 }
672 let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
674 let lines: Vec<&str> = std::str::from_utf8(&out)
675 .unwrap()
676 .split_terminator('\n')
677 .collect();
678 assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
679 }
680
681 struct SlowBuf {
686 inner: Arc<Mutex<Vec<u8>>>,
687 delay: std::time::Duration,
688 }
689 impl Write for SlowBuf {
690 fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
691 std::thread::sleep(self.delay);
692 self.inner.lock().unwrap().extend_from_slice(bs);
693 Ok(bs.len())
694 }
695 fn flush(&mut self) -> std::io::Result<()> {
696 Ok(())
697 }
698 }
699
700 #[test]
701 fn bare_sink_defaults_to_drop_strategy() {
702 struct Bare;
703 impl AuditSink for Bare {
704 fn on_call(&self, _: &CallEvent) {}
705 }
706 assert!(matches!(
707 Bare.backpressure_strategy(),
708 BackpressureStrategy::Drop
709 ));
710 }
711
712 #[test]
713 fn with_strategy_block_reports_block() {
714 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
715 let sink = JsonLinesAuditSink::with_strategy(
716 Box::new(SharedBuf(buf)),
717 16,
718 BackpressureStrategy::Block,
719 );
720 assert!(matches!(
721 sink.backpressure_strategy(),
722 BackpressureStrategy::Block
723 ));
724 }
725
726 #[test]
727 fn block_strategy_loses_nothing_under_burst() {
728 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
734 let sink = JsonLinesAuditSink::with_strategy(
735 Box::new(SlowBuf {
736 inner: buf.clone(),
737 delay: std::time::Duration::from_micros(50),
738 }),
739 4,
740 BackpressureStrategy::Block,
741 );
742 let ev = mk_event(Outcome::Success);
743 for _ in 0..100 {
744 sink.on_call(&ev);
745 }
746 assert_eq!(sink.drops(), 0, "Block strategy must never drop");
747 drop(sink); let n = buf.lock().unwrap().iter().filter(|b| **b == b'\n').count();
749 assert_eq!(
750 n, 100,
751 "Block must flush all 100 events by the time drop returns"
752 );
753 }
754
755 #[test]
756 fn fallback_strategy_routes_overflow_to_fallback() {
757 struct CountSink(Arc<AtomicU64>);
758 impl AuditSink for CountSink {
759 fn on_call(&self, _: &CallEvent) {
760 self.0.fetch_add(1, Ordering::Relaxed);
761 }
762 }
763 let fb_count = Arc::new(AtomicU64::new(0));
764 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
765 let sink = JsonLinesAuditSink::with_strategy(
766 Box::new(SlowBuf {
767 inner: buf,
768 delay: std::time::Duration::from_millis(5),
769 }),
770 1,
771 BackpressureStrategy::FallbackSink(Arc::new(CountSink(fb_count.clone()))),
772 );
773 let ev = mk_event(Outcome::Success);
774 for _ in 0..50 {
775 sink.on_call(&ev);
776 }
777 assert_eq!(sink.drops(), 0, "fallback caught overflow; primary drops 0");
778 assert!(
779 fb_count.load(Ordering::Relaxed) > 0,
780 "fallback sink must catch the overflow events"
781 );
782 }
783
784 #[test]
785 fn now_rfc3339_format_is_parseable() {
786 let s = now_rfc3339();
787 chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
788 }
789}