1use chrono::Utc;
15use serde::{Deserialize, Serialize};
16use std::io::Write;
17use std::path::Path;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21pub const SCHEMA_VERSION: u32 = 2;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct CallEvent {
38 pub ts: String,
39 pub call_id: String,
40 pub tool_id: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub caller_id: Option<String>,
43 pub granted_capabilities: Vec<String>,
44 pub duration_ms: u64,
45 pub outcome: Outcome,
46 pub tier: String,
47 pub dry_run: bool,
48 pub schema_version: u32,
49 #[serde(default)]
55 pub secrets_resolved: bool,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
61 pub cursor_page: Option<u32>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(tag = "kind", rename_all = "snake_case")]
67pub enum Outcome {
68 Success,
69 ExecutionFailed { code: String, retryable: bool },
70 InvalidArgs { message: String },
71 CapabilityDenied { missing: Vec<String> },
72 RateLimited { retry_after_ms: Option<u64> },
73 ToolNotFound,
74}
75
76pub trait AuditSink: Send + Sync {
79 fn on_call(&self, event: &CallEvent);
80 fn drops(&self) -> u64 {
87 0
88 }
89}
90
91pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
95
96pub struct JsonLinesAuditSink {
107 tx: tokio::sync::mpsc::Sender<CallEvent>,
108 drops: Arc<AtomicU64>,
109}
110
111impl JsonLinesAuditSink {
112 pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
115 Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
116 }
117
118 pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
122 let (tx, mut rx) = tokio::sync::mpsc::channel::<CallEvent>(capacity);
123 let drops = Arc::new(AtomicU64::new(0));
124 let mut writer = writer;
125 tokio::spawn(async move {
126 while let Some(ev) = rx.recv().await {
127 if let Ok(mut line) = serde_json::to_vec(&ev) {
128 line.push(b'\n');
129 let _ = writer.write_all(&line);
130 let _ = writer.flush();
131 }
132 }
133 let _ = writer.flush();
136 });
137 Self { tx, drops }
138 }
139
140 pub fn stdout() -> Self {
141 Self::new(Box::new(std::io::stdout()))
142 }
143
144 pub fn stderr() -> Self {
145 Self::new(Box::new(std::io::stderr()))
146 }
147
148 pub fn file(path: &Path) -> std::io::Result<Self> {
150 let f = std::fs::OpenOptions::new()
151 .create(true)
152 .append(true)
153 .open(path)?;
154 Ok(Self::new(Box::new(f)))
155 }
156
157 pub fn drops(&self) -> u64 {
161 self.drops.load(Ordering::Relaxed)
162 }
163}
164
165impl AuditSink for JsonLinesAuditSink {
166 fn on_call(&self, event: &CallEvent) {
167 match self.tx.try_send(event.clone()) {
168 Ok(()) => {}
169 Err(_) => {
172 self.drops.fetch_add(1, Ordering::Relaxed);
173 }
174 }
175 }
176 fn drops(&self) -> u64 {
177 self.drops.load(Ordering::Relaxed)
178 }
179}
180
181pub fn now_rfc3339() -> String {
185 Utc::now().to_rfc3339()
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use std::sync::Mutex;
192
193 fn mk_event(outcome: Outcome) -> CallEvent {
194 CallEvent {
195 ts: now_rfc3339(),
196 call_id: "01J000000000000000000000TEST".into(),
197 tool_id: "ref:echo.say".into(),
198 caller_id: Some("test-client".into()),
199 granted_capabilities: vec!["read".into(), "write".into()],
200 duration_ms: 17,
201 outcome,
202 tier: "warm".into(),
203 dry_run: false,
204 schema_version: SCHEMA_VERSION,
205 secrets_resolved: false,
206 cursor_page: None,
207 }
208 }
209
210 #[test]
211 fn success_event_serializes() {
212 let e = mk_event(Outcome::Success);
213 let j: serde_json::Value =
214 serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
215 assert_eq!(j["tool_id"], "ref:echo.say");
216 assert_eq!(j["outcome"]["kind"], "success");
217 assert_eq!(j["schema_version"], 2);
218 assert_eq!(j["dry_run"], false);
219 }
220
221 #[test]
222 fn capability_denied_outcome_tagged_correctly() {
223 let e = mk_event(Outcome::CapabilityDenied {
224 missing: vec!["conformance.denied".into()],
225 });
226 let j: serde_json::Value =
227 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
228 assert_eq!(j["outcome"]["kind"], "capability_denied");
229 assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
230 }
231
232 #[test]
233 fn execution_failed_carries_code_and_retryable() {
234 let e = mk_event(Outcome::ExecutionFailed {
235 code: "FS_NOT_FOUND".into(),
236 retryable: false,
237 });
238 let j: serde_json::Value =
239 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
240 assert_eq!(j["outcome"]["kind"], "execution_failed");
241 assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
242 assert_eq!(j["outcome"]["retryable"], false);
243 }
244
245 #[test]
246 fn rate_limited_outcome_with_null_retry_after() {
247 let e = mk_event(Outcome::RateLimited {
248 retry_after_ms: None,
249 });
250 let j: serde_json::Value =
251 serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
252 assert_eq!(j["outcome"]["kind"], "rate_limited");
253 assert!(j["outcome"]["retry_after_ms"].is_null());
254 }
255
256 #[test]
257 fn caller_id_skipped_when_none() {
258 let mut e = mk_event(Outcome::Success);
259 e.caller_id = None;
260 let s = serde_json::to_string(&e).unwrap();
261 assert!(
262 !s.contains("caller_id"),
263 "caller_id None should be skipped, got: {}",
264 s
265 );
266 }
267
268 struct SharedBuf(Arc<Mutex<Vec<u8>>>);
273 impl Write for SharedBuf {
274 fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
275 self.0.lock().unwrap().extend_from_slice(bs);
276 Ok(bs.len())
277 }
278 fn flush(&mut self) -> std::io::Result<()> {
279 Ok(())
280 }
281 }
282
283 async fn wait_for_lines(
286 buf: &Arc<Mutex<Vec<u8>>>,
287 target_lines: usize,
288 timeout: std::time::Duration,
289 ) -> Vec<u8> {
290 let deadline = std::time::Instant::now() + timeout;
291 loop {
292 {
293 let guard = buf.lock().unwrap();
294 let count = guard.iter().filter(|b| **b == b'\n').count();
295 if count >= target_lines || std::time::Instant::now() > deadline {
296 return guard.clone();
297 }
298 }
299 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
300 }
301 }
302
303 #[tokio::test]
304 async fn json_lines_sink_writes_one_line_per_event() {
305 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
306 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
307 sink.on_call(&mk_event(Outcome::Success));
308 sink.on_call(&mk_event(Outcome::ToolNotFound));
309
310 let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
311 let text = String::from_utf8(out).unwrap();
312 let lines: Vec<&str> = text.split_terminator('\n').collect();
313 assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
314 for line in &lines {
315 let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
316 }
317 }
318
319 #[tokio::test]
322 async fn on_call_is_non_blocking_under_burst() {
323 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
324 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
325 let ev = mk_event(Outcome::Success);
326 let started = std::time::Instant::now();
330 for _ in 0..100 {
331 sink.on_call(&ev);
332 }
333 let elapsed = started.elapsed();
334 assert!(
335 elapsed < std::time::Duration::from_millis(50),
336 "100 on_call invocations took {elapsed:?}; expected <50ms"
337 );
338 }
339
340 #[tokio::test]
341 async fn drops_counter_increments_when_channel_full() {
342 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
345 let sink = JsonLinesAuditSink::new_with_capacity(Box::new(SharedBuf(buf)), 4);
346 let ev = mk_event(Outcome::Success);
347 for _ in 0..200 {
348 sink.on_call(&ev);
349 }
350 let dropped = sink.drops();
354 assert!(
355 dropped > 0,
356 "expected some drops at capacity=4 with 200-event burst, got 0"
357 );
358 }
359
360 #[tokio::test]
361 async fn events_eventually_drain_to_writer() {
362 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
363 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
364 let ev = mk_event(Outcome::Success);
365 for _ in 0..10 {
366 sink.on_call(&ev);
367 }
368 let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
369 let text = String::from_utf8(out).unwrap();
370 let lines: Vec<&str> = text.split_terminator('\n').collect();
371 assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
372 }
373
374 #[tokio::test]
375 async fn dropping_sink_drains_pending_then_exits() {
376 let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
377 {
378 let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
379 for _ in 0..5 {
380 sink.on_call(&mk_event(Outcome::Success));
381 }
382 }
384 let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
386 let lines: Vec<&str> = std::str::from_utf8(&out)
387 .unwrap()
388 .split_terminator('\n')
389 .collect();
390 assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
391 }
392
393 #[test]
394 fn now_rfc3339_format_is_parseable() {
395 let s = now_rfc3339();
396 chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
397 }
398}