Skip to main content

atd_runtime/
audit.rs

1//! Structured per-call audit events + pluggable sinks.
2//!
3//! `AuditSink` is the observation hook called at dispatch return points.
4//! It sits OUTSIDE `Middleware` (which is a result-rewriter, success-only)
5//! because audit needs to observe every outcome including failures.
6//!
7//! `JsonLinesAuditSink` is the default sink shipped in v1: one JSON
8//! object per line. SP-concurrency-baseline §5.4: an internal bounded
9//! `tokio::sync::mpsc` + dedicated drain task decouple the dispatch hot
10//! path from synchronous file I/O, eliminating the §1.3 secondary cliff
11//! (mutex-blocked reactor stall at ~50 concurrent dispatches per second).
12//! Construction requires a tokio runtime context.
13
14use chrono::Utc;
15use serde::{Deserialize, Serialize};
16use std::io::Write;
17use std::path::Path;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21/// Audit schema version. Consumers should branch on this if future
22/// breaking changes land.
23///
24/// - v1 (SP-operability-v1) — initial stable schema.
25/// - v2 (SP-pagination-v1) — adds optional `cursor_page` field. The field
26///   is `#[serde(default, skip_serializing_if = "Option::is_none")]` so
27///   v1 consumers tolerate v2 events; v2 consumers reading v1 events see
28///   `cursor_page: None`. The version bump records when the field landed,
29///   not a breaking shape change.
30pub const SCHEMA_VERSION: u32 = 2;
31
32/// One per-call audit event. Emitted at every `Request::RunTool`
33/// return point (success, invalid_args, execution_failed, cap_denied,
34/// rate_limited, tool_not_found). Ping / Hello / ToolList / ToolSchema
35/// do NOT emit events in v1.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct CallEvent {
38    pub ts: String,
39    pub call_id: String,
40    pub tool_id: String,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub caller_id: Option<String>,
43    pub granted_capabilities: Vec<String>,
44    pub duration_ms: u64,
45    pub outcome: Outcome,
46    pub tier: String,
47    pub dry_run: bool,
48    pub schema_version: u32,
49    /// `true` iff a `TokenBroker` was configured AND it returned
50    /// `Ok(Some(_))` for this caller (SP-token-broker-phase1). Always
51    /// `false` for early-return paths (capability denied, dry-run,
52    /// rate-limited, tool-not-found) and for servers without a broker.
53    /// No key names or values are recorded.
54    #[serde(default)]
55    pub secrets_resolved: bool,
56    /// SP-pagination-v1 — 1-based page index for paginated calls. `None`
57    /// for non-paginated dispatches (the vast majority of events; saves
58    /// bytes in the audit log). `Some(1)` for the initial `RunTool` that
59    /// returned a cursor; `Some(2..)` for each `RunToolContinue`.
60    #[serde(default, skip_serializing_if = "Option::is_none")]
61    pub cursor_page: Option<u32>,
62}
63
64/// Outcome variants cover the full dispatch-return space for RunTool.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(tag = "kind", rename_all = "snake_case")]
67pub enum Outcome {
68    Success,
69    ExecutionFailed { code: String, retryable: bool },
70    InvalidArgs { message: String },
71    CapabilityDenied { missing: Vec<String> },
72    RateLimited { retry_after_ms: Option<u64> },
73    ToolNotFound,
74}
75
76/// Observer hook. Non-blocking: writes happen synchronously to the
77/// sink's own backpressure (no queuing here). Must not panic.
78pub trait AuditSink: Send + Sync {
79    fn on_call(&self, event: &CallEvent);
80    /// Total events dropped because the sink's queue was full. Default `0`
81    /// for sinks that don't queue (custom synchronous adopter impls).
82    /// `JsonLinesAuditSink` overrides with its `Arc<AtomicU64>` counter so
83    /// `Server::metrics_snapshot()` (SP-concurrency-baseline §5.7) can
84    /// surface the count without coupling the metrics module to the
85    /// concrete sink type.
86    fn drops(&self) -> u64 {
87        0
88    }
89}
90
91/// Default channel capacity. 1024 events × ~500 bytes ≈ 512 KB peak buffer;
92/// drains at the rate the wrapped writer can absorb (typical disk write
93/// rate: 10k events/s sustained, transient bursts much higher).
94pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
95
96/// SP-concurrency-baseline §5.4. Writes one JSON object per line to the
97/// wrapped writer via a dedicated tokio task. `on_call` is non-blocking
98/// (`try_send`); if the bounded channel is full the event is dropped and
99/// the `audit_drops` counter increments — log loss >> dispatch stall.
100///
101/// **Construction requires a tokio runtime context** because it spawns a
102/// drain task on `new`. All shipped helpers (`stdout` / `stderr` / `file`)
103/// inherit this requirement. Adopters who construct sinks outside async
104/// scope should use `tokio::runtime::Handle::current().block_on(...)` or
105/// build their own sync sink implementing `AuditSink`.
106pub struct JsonLinesAuditSink {
107    tx: tokio::sync::mpsc::Sender<CallEvent>,
108    drops: Arc<AtomicU64>,
109}
110
111impl JsonLinesAuditSink {
112    /// Construct with default capacity (`DEFAULT_AUDIT_QUEUE_CAPACITY`).
113    /// Spawns a tokio task that owns the writer and drains the channel.
114    pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
115        Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
116    }
117
118    /// Construct with an explicit channel capacity. Smaller capacities
119    /// drop sooner under burst load; larger capacities hold more bytes
120    /// in memory at peak.
121    pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
122        let (tx, mut rx) = tokio::sync::mpsc::channel::<CallEvent>(capacity);
123        let drops = Arc::new(AtomicU64::new(0));
124        let mut writer = writer;
125        tokio::spawn(async move {
126            while let Some(ev) = rx.recv().await {
127                if let Ok(mut line) = serde_json::to_vec(&ev) {
128                    line.push(b'\n');
129                    let _ = writer.write_all(&line);
130                    let _ = writer.flush();
131                }
132            }
133            // Channel closed (sink dropped) — final flush so the last batch
134            // hits disk even if the runtime is about to shut down.
135            let _ = writer.flush();
136        });
137        Self { tx, drops }
138    }
139
140    pub fn stdout() -> Self {
141        Self::new(Box::new(std::io::stdout()))
142    }
143
144    pub fn stderr() -> Self {
145        Self::new(Box::new(std::io::stderr()))
146    }
147
148    /// Open `path` for append; creates the file if missing.
149    pub fn file(path: &Path) -> std::io::Result<Self> {
150        let f = std::fs::OpenOptions::new()
151            .create(true)
152            .append(true)
153            .open(path)?;
154        Ok(Self::new(Box::new(f)))
155    }
156
157    /// Count of events dropped because the channel was full when
158    /// `on_call` was invoked. Exposed for the SP-concurrency-baseline
159    /// §G7 metrics snapshot and for adopter dashboards.
160    pub fn drops(&self) -> u64 {
161        self.drops.load(Ordering::Relaxed)
162    }
163}
164
165impl AuditSink for JsonLinesAuditSink {
166    fn on_call(&self, event: &CallEvent) {
167        match self.tx.try_send(event.clone()) {
168            Ok(()) => {}
169            // Channel full or closed — drop the event and bump the counter.
170            // Sync dispatch path must not block; log loss >> dispatch stall.
171            Err(_) => {
172                self.drops.fetch_add(1, Ordering::Relaxed);
173            }
174        }
175    }
176    fn drops(&self) -> u64 {
177        self.drops.load(Ordering::Relaxed)
178    }
179}
180
181/// Produce an RFC 3339 UTC timestamp string suitable for `CallEvent::ts`.
182/// Dispatch sites use this rather than calling chrono directly so the
183/// format stays consistent.
184pub fn now_rfc3339() -> String {
185    Utc::now().to_rfc3339()
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use std::sync::Mutex;
192
193    fn mk_event(outcome: Outcome) -> CallEvent {
194        CallEvent {
195            ts: now_rfc3339(),
196            call_id: "01J000000000000000000000TEST".into(),
197            tool_id: "ref:echo.say".into(),
198            caller_id: Some("test-client".into()),
199            granted_capabilities: vec!["read".into(), "write".into()],
200            duration_ms: 17,
201            outcome,
202            tier: "warm".into(),
203            dry_run: false,
204            schema_version: SCHEMA_VERSION,
205            secrets_resolved: false,
206            cursor_page: None,
207        }
208    }
209
210    #[test]
211    fn success_event_serializes() {
212        let e = mk_event(Outcome::Success);
213        let j: serde_json::Value =
214            serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
215        assert_eq!(j["tool_id"], "ref:echo.say");
216        assert_eq!(j["outcome"]["kind"], "success");
217        assert_eq!(j["schema_version"], 2);
218        assert_eq!(j["dry_run"], false);
219    }
220
221    #[test]
222    fn capability_denied_outcome_tagged_correctly() {
223        let e = mk_event(Outcome::CapabilityDenied {
224            missing: vec!["conformance.denied".into()],
225        });
226        let j: serde_json::Value =
227            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
228        assert_eq!(j["outcome"]["kind"], "capability_denied");
229        assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
230    }
231
232    #[test]
233    fn execution_failed_carries_code_and_retryable() {
234        let e = mk_event(Outcome::ExecutionFailed {
235            code: "FS_NOT_FOUND".into(),
236            retryable: false,
237        });
238        let j: serde_json::Value =
239            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
240        assert_eq!(j["outcome"]["kind"], "execution_failed");
241        assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
242        assert_eq!(j["outcome"]["retryable"], false);
243    }
244
245    #[test]
246    fn rate_limited_outcome_with_null_retry_after() {
247        let e = mk_event(Outcome::RateLimited {
248            retry_after_ms: None,
249        });
250        let j: serde_json::Value =
251            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
252        assert_eq!(j["outcome"]["kind"], "rate_limited");
253        assert!(j["outcome"]["retry_after_ms"].is_null());
254    }
255
256    #[test]
257    fn caller_id_skipped_when_none() {
258        let mut e = mk_event(Outcome::Success);
259        e.caller_id = None;
260        let s = serde_json::to_string(&e).unwrap();
261        assert!(
262            !s.contains("caller_id"),
263            "caller_id None should be skipped, got: {}",
264            s
265        );
266    }
267
268    /// Shared in-memory buffer wrapped behind a `Write` impl. Used as the
269    /// sink's target so tests can inspect what got written without touching
270    /// the filesystem. Cloning the `Arc<Mutex<...>>` outside the box lets
271    /// the test read while the drain task writes.
272    struct SharedBuf(Arc<Mutex<Vec<u8>>>);
273    impl Write for SharedBuf {
274        fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
275            self.0.lock().unwrap().extend_from_slice(bs);
276            Ok(bs.len())
277        }
278        fn flush(&mut self) -> std::io::Result<()> {
279            Ok(())
280        }
281    }
282
283    /// Spin until the buffer accumulates `target_lines` newline-terminated
284    /// records or `timeout` elapses. Returns the buffer's accumulated bytes.
285    async fn wait_for_lines(
286        buf: &Arc<Mutex<Vec<u8>>>,
287        target_lines: usize,
288        timeout: std::time::Duration,
289    ) -> Vec<u8> {
290        let deadline = std::time::Instant::now() + timeout;
291        loop {
292            {
293                let guard = buf.lock().unwrap();
294                let count = guard.iter().filter(|b| **b == b'\n').count();
295                if count >= target_lines || std::time::Instant::now() > deadline {
296                    return guard.clone();
297                }
298            }
299            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
300        }
301    }
302
303    #[tokio::test]
304    async fn json_lines_sink_writes_one_line_per_event() {
305        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
306        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
307        sink.on_call(&mk_event(Outcome::Success));
308        sink.on_call(&mk_event(Outcome::ToolNotFound));
309
310        let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
311        let text = String::from_utf8(out).unwrap();
312        let lines: Vec<&str> = text.split_terminator('\n').collect();
313        assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
314        for line in &lines {
315            let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
316        }
317    }
318
319    // ---- SP-concurrency-baseline §5.4 mpsc rewrite tests ----
320
321    #[tokio::test]
322    async fn on_call_is_non_blocking_under_burst() {
323        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
324        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
325        let ev = mk_event(Outcome::Success);
326        // 100 synchronous on_call invocations on the dispatch hot path must
327        // complete in well under 10ms total — sub-millisecond per call once
328        // the channel is warm.
329        let started = std::time::Instant::now();
330        for _ in 0..100 {
331            sink.on_call(&ev);
332        }
333        let elapsed = started.elapsed();
334        assert!(
335            elapsed < std::time::Duration::from_millis(50),
336            "100 on_call invocations took {elapsed:?}; expected <50ms"
337        );
338    }
339
340    #[tokio::test]
341    async fn drops_counter_increments_when_channel_full() {
342        // Capacity 4 — the drain task can't keep up with a burst of 200
343        // events, so the channel saturates and try_send fails.
344        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
345        let sink = JsonLinesAuditSink::new_with_capacity(Box::new(SharedBuf(buf)), 4);
346        let ev = mk_event(Outcome::Success);
347        for _ in 0..200 {
348            sink.on_call(&ev);
349        }
350        // Some of those 200 must have been dropped (the drain task is a
351        // single task; under tight capacity it can't service 200 sends back-
352        // to-back without scheduler yields between them).
353        let dropped = sink.drops();
354        assert!(
355            dropped > 0,
356            "expected some drops at capacity=4 with 200-event burst, got 0"
357        );
358    }
359
360    #[tokio::test]
361    async fn events_eventually_drain_to_writer() {
362        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
363        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
364        let ev = mk_event(Outcome::Success);
365        for _ in 0..10 {
366            sink.on_call(&ev);
367        }
368        let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
369        let text = String::from_utf8(out).unwrap();
370        let lines: Vec<&str> = text.split_terminator('\n').collect();
371        assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
372    }
373
374    #[tokio::test]
375    async fn dropping_sink_drains_pending_then_exits() {
376        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
377        {
378            let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
379            for _ in 0..5 {
380                sink.on_call(&mk_event(Outcome::Success));
381            }
382            // Drop sink at end of block → tx closes → drain task finishes.
383        }
384        // Give the drain task time to consume the remaining queue and exit.
385        let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
386        let lines: Vec<&str> = std::str::from_utf8(&out)
387            .unwrap()
388            .split_terminator('\n')
389            .collect();
390        assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
391    }
392
393    #[test]
394    fn now_rfc3339_format_is_parseable() {
395        let s = now_rfc3339();
396        chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
397    }
398}