Skip to main content

atd_runtime/
audit.rs

1//! Structured per-call audit events + pluggable sinks.
2//!
3//! `AuditSink` is the observation hook called at dispatch return points.
4//! It sits OUTSIDE `Middleware` (a wire-reply rewriter) because audit
5//! observes *metadata* about every outcome — including failures — and never
6//! carries the result/error body (no PHI exit). `Middleware` rewrites the
7//! body the LLM sees (`on_result` for success / `ExecutionFailed`,
8//! `on_error` for `Response::Error`); `AuditSink` records who/what/when.
9//!
10//! `JsonLinesAuditSink` writes one JSON object per line via a dedicated
11//! **std thread** drain over a bounded `std::sync::mpsc::sync_channel`.
12//! SP-concurrency-baseline §5.4 introduced the queue to decouple the
13//! dispatch hot path from synchronous file I/O; SP-observability-
14//! completeness-v1 Axis B made the queue-full policy selectable
15//! ([`BackpressureStrategy`]: `Drop` default / `Block` / `FallbackSink`)
16//! and moved the drain to a std thread, so construction no longer requires
17//! a tokio runtime context.
18
19use chrono::Utc;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::Path;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26/// Audit schema version. Consumers should branch on this if future
27/// breaking changes land.
28///
29/// - v1 (SP-operability-v1) — initial stable schema.
30/// - v2 (SP-pagination-v1) — adds optional `cursor_page` field. The field
31///   is `#[serde(default, skip_serializing_if = "Option::is_none")]` so
32///   v1 consumers tolerate v2 events; v2 consumers reading v1 events see
33///   `cursor_page: None`. The version bump records when the field landed,
34///   not a breaking shape change.
35/// - v3 (SP-observability-completeness-v1) — adds optional
36///   `capability_provenance`. Same additive-optional rule: v2 readers
37///   tolerate v3 events (ignore the field); v3 readers see `None` on v2
38///   events. Records per-capability source so an operator can answer
39///   "why did caller X have capability Y?" without re-deriving the chain.
40pub const SCHEMA_VERSION: u32 = 3;
41
42/// One per-call audit event. Emitted at every `Request::RunTool`
43/// return point (success, invalid_args, execution_failed, cap_denied,
44/// rate_limited, tool_not_found). Ping / Hello / ToolList / ToolSchema
45/// do NOT emit events in v1.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct CallEvent {
48    pub ts: String,
49    pub call_id: String,
50    pub tool_id: String,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub caller_id: Option<String>,
53    pub granted_capabilities: Vec<String>,
54    pub duration_ms: u64,
55    pub outcome: Outcome,
56    pub tier: String,
57    pub dry_run: bool,
58    pub schema_version: u32,
59    /// `true` iff a `TokenBroker` was configured AND it returned
60    /// `Ok(Some(_))` for this caller (SP-token-broker-phase1). Always
61    /// `false` for early-return paths (capability denied, dry-run,
62    /// rate-limited, tool-not-found) and for servers without a broker.
63    /// No key names or values are recorded.
64    #[serde(default)]
65    pub secrets_resolved: bool,
66    /// SP-pagination-v1 — 1-based page index for paginated calls. `None`
67    /// for non-paginated dispatches (the vast majority of events; saves
68    /// bytes in the audit log). `Some(1)` for the initial `RunTool` that
69    /// returned a cursor; `Some(2..)` for each `RunToolContinue`.
70    #[serde(default, skip_serializing_if = "Option::is_none")]
71    pub cursor_page: Option<u32>,
72    /// SP-observability-completeness-v1 Axis C — per-capability source
73    /// attribution. `None` when provenance wasn't tracked (back-compat,
74    /// early-return paths with no capability context); `Some(vec)` when
75    /// dispatch recorded which mechanism granted each capability. Lets an
76    /// operator trace each granted capability to the operator string
77    /// allow-list or a specific UCAN chain link.
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub capability_provenance: Option<Vec<CapProvenance>>,
80}
81
82/// SP-observability-completeness-v1 Axis C — one capability + how it was
83/// granted. The `granted_capabilities` field on `CallEvent` records the
84/// *result* set; this records the *source* of each.
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub struct CapProvenance {
87    pub cap: String,
88    pub source: ProvSource,
89}
90
91/// Where a granted capability came from (architecture §5.2 — the two
92/// composing mechanisms whose union forms `granted_capabilities`).
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
94#[serde(tag = "kind", rename_all = "snake_case")]
95pub enum ProvSource {
96    /// Granted by the operator string allow-list
97    /// (`--grant-capability` ∩ `Hello.requested_capabilities`).
98    StringAllowList,
99    /// Granted by a UCAN-lite chain link. `issuer_did` is the link's
100    /// `iss`; `chain_depth` is its position (0 = root).
101    UcanChain { issuer_did: String, chain_depth: u8 },
102}
103
104/// Outcome variants cover the full dispatch-return space for RunTool.
105#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(tag = "kind", rename_all = "snake_case")]
107pub enum Outcome {
108    Success,
109    ExecutionFailed { code: String, retryable: bool },
110    InvalidArgs { message: String },
111    CapabilityDenied { missing: Vec<String> },
112    RateLimited { retry_after_ms: Option<u64> },
113    ToolNotFound,
114}
115
116/// SP-observability-completeness-v1 Axis B. How a sink behaves when its
117/// internal queue is full at `on_call` time.
118#[derive(Clone)]
119pub enum BackpressureStrategy {
120    /// Drop the event, increment `drops()`. The SP-concurrency-baseline
121    /// default — protects dispatch throughput; correct for the 90%
122    /// non-compliance case. "log loss >> dispatch stall."
123    Drop,
124    /// Block the dispatch path until the queue accepts the event. For
125    /// compliance adopters (HIPAA §164.528) where a dropped audit record is
126    /// unacceptable: dispatch slows under audit backpressure rather than
127    /// losing the disclosure record. Requires a multi-thread runtime (the
128    /// ref binaries use one) so a blocked worker doesn't starve accept.
129    Block,
130    /// On queue-full, write the event synchronously to a fallback sink
131    /// (e.g. stderr / a second file) instead of dropping. Bounds the hot
132    /// path (no indefinite block) with no silent loss. The fallback SHOULD
133    /// be a synchronous sink, never another queueing sink (avoid chained
134    /// blocking).
135    FallbackSink(Arc<dyn AuditSink>),
136}
137
138impl std::fmt::Debug for BackpressureStrategy {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        match self {
141            BackpressureStrategy::Drop => f.write_str("Drop"),
142            BackpressureStrategy::Block => f.write_str("Block"),
143            BackpressureStrategy::FallbackSink(_) => f.write_str("FallbackSink(..)"),
144        }
145    }
146}
147
148/// Observer hook. `on_call` is invoked synchronously on the dispatch path;
149/// its behaviour under queue pressure is the sink's
150/// [`backpressure_strategy`](AuditSink::backpressure_strategy). Must not panic.
151pub trait AuditSink: Send + Sync {
152    fn on_call(&self, event: &CallEvent);
153    /// Total events dropped because the sink's queue was full. Default `0`
154    /// for sinks that don't queue (custom synchronous adopter impls).
155    /// `JsonLinesAuditSink` overrides with its `Arc<AtomicU64>` counter so
156    /// `Server::metrics_snapshot()` (SP-concurrency-baseline §5.7) can
157    /// surface the count without coupling the metrics module to the
158    /// concrete sink type.
159    fn drops(&self) -> u64 {
160        0
161    }
162    /// SP-observability-completeness-v1 Axis B — the sink's queue-full
163    /// policy. Default `Drop` (byte-compatible with pre-SP sinks); medical /
164    /// compliance adopters override (or construct `JsonLinesAuditSink` via
165    /// `with_strategy`) to get `Block` / `FallbackSink`.
166    fn backpressure_strategy(&self) -> BackpressureStrategy {
167        BackpressureStrategy::Drop
168    }
169}
170
171/// Default channel capacity. 1024 events × ~500 bytes ≈ 512 KB peak buffer;
172/// drains at the rate the wrapped writer can absorb (typical disk write
173/// rate: 10k events/s sustained, transient bursts much higher).
174pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
175
176/// SP-concurrency-baseline §5.4 + SP-observability-completeness-v1 Axis B.
177/// Writes one JSON object per line to the wrapped writer via a dedicated
178/// **std thread** drain. Behaviour when the bounded channel is full is
179/// selectable via [`BackpressureStrategy`]:
180///
181/// - `Drop` (default) — `try_send`; on full, drop + bump `drops()`. The
182///   SP-concurrency-baseline behaviour (log loss >> dispatch stall).
183/// - `Block` — blocking `send`; dispatch slows rather than losing an event
184///   (HIPAA §164.528 no-loss audit). The drain is a dedicated OS thread, so
185///   blocking the sync `on_call` blocks the calling thread, not an async
186///   reactor primitive — use a multi-thread runtime (ref binaries do).
187/// - `FallbackSink(fb)` — on full, synchronously write to `fb`.
188///
189/// Construction no longer requires a tokio runtime context (the drain is a
190/// plain `std::thread`), unlike the prior tokio-mpsc implementation.
191pub struct JsonLinesAuditSink {
192    /// `Option` so `Drop` can take + drop the sender (closing the channel)
193    /// before joining the drain thread.
194    tx: Option<std::sync::mpsc::SyncSender<CallEvent>>,
195    /// Retained so `Drop` can join the drain thread and guarantee the
196    /// buffered tail flushes before the sink goes away (#4 no-loss-at-drop).
197    drain: Option<std::thread::JoinHandle<()>>,
198    drops: Arc<AtomicU64>,
199    strategy: BackpressureStrategy,
200}
201
202impl JsonLinesAuditSink {
203    /// Construct with default capacity (`DEFAULT_AUDIT_QUEUE_CAPACITY`) and
204    /// the `Drop` strategy (pre-SP behaviour).
205    pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
206        Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
207    }
208
209    /// Construct with an explicit channel capacity, `Drop` strategy.
210    pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
211        Self::with_strategy(writer, capacity, BackpressureStrategy::Drop)
212    }
213
214    /// SP-observability-completeness-v1 Axis B — construct with an explicit
215    /// backpressure strategy. Spawns a std thread that owns the writer and
216    /// drains the channel.
217    pub fn with_strategy(
218        writer: Box<dyn Write + Send + 'static>,
219        capacity: usize,
220        strategy: BackpressureStrategy,
221    ) -> Self {
222        // SP-observability-completeness-v1 #3 — Block blocks the calling
223        // (sync) thread under backpressure. On a current_thread tokio runtime
224        // that thread is the sole worker, so a stall starves accept. Warn
225        // (best-effort) when we can detect that flavor at construction time.
226        if matches!(strategy, BackpressureStrategy::Block) {
227            if let Ok(h) = tokio::runtime::Handle::try_current() {
228                if h.runtime_flavor() == tokio::runtime::RuntimeFlavor::CurrentThread {
229                    eprintln!(
230                        "atd: WARNING — JsonLinesAuditSink Block strategy on a \
231                         current_thread runtime; a blocked worker can stall accept \
232                         under audit backpressure. Prefer a multi-thread runtime."
233                    );
234                }
235            }
236        }
237        let (tx, rx) = std::sync::mpsc::sync_channel::<CallEvent>(capacity);
238        let drops = Arc::new(AtomicU64::new(0));
239        let mut writer = writer;
240        let drain = std::thread::spawn(move || {
241            while let Ok(ev) = rx.recv() {
242                if let Ok(mut line) = serde_json::to_vec(&ev) {
243                    line.push(b'\n');
244                    let _ = writer.write_all(&line);
245                    let _ = writer.flush();
246                }
247            }
248            // All senders dropped + queue drained — final flush.
249            let _ = writer.flush();
250        });
251        Self {
252            tx: Some(tx),
253            drain: Some(drain),
254            drops,
255            strategy,
256        }
257    }
258
259    pub fn stdout() -> Self {
260        Self::new(Box::new(std::io::stdout()))
261    }
262
263    pub fn stderr() -> Self {
264        Self::new(Box::new(std::io::stderr()))
265    }
266
267    /// Open `path` for append; creates the file if missing.
268    pub fn file(path: &Path) -> std::io::Result<Self> {
269        let f = std::fs::OpenOptions::new()
270            .create(true)
271            .append(true)
272            .open(path)?;
273        Ok(Self::new(Box::new(f)))
274    }
275
276    /// Count of events dropped because the channel was full when
277    /// `on_call` was invoked (or the drain thread had exited).
278    pub fn drops(&self) -> u64 {
279        self.drops.load(Ordering::Relaxed)
280    }
281}
282
283impl AuditSink for JsonLinesAuditSink {
284    fn on_call(&self, event: &CallEvent) {
285        let Some(tx) = self.tx.as_ref() else {
286            // Sink is being torn down (tx taken in Drop); count as a drop.
287            self.drops.fetch_add(1, Ordering::Relaxed);
288            return;
289        };
290        match &self.strategy {
291            BackpressureStrategy::Drop => {
292                // Non-blocking; on full, drop + count. log loss >> stall.
293                if tx.try_send(event.clone()).is_err() {
294                    self.drops.fetch_add(1, Ordering::Relaxed);
295                }
296            }
297            BackpressureStrategy::Block => {
298                // Blocking send — no event lost; dispatch slows under
299                // backpressure. Err only if the drain thread is gone
300                // (shutdown), in which case count a drop rather than hang.
301                if tx.send(event.clone()).is_err() {
302                    self.drops.fetch_add(1, Ordering::Relaxed);
303                }
304            }
305            BackpressureStrategy::FallbackSink(fb) => {
306                if tx.try_send(event.clone()).is_err() {
307                    fb.on_call(event);
308                }
309            }
310        }
311    }
312    fn drops(&self) -> u64 {
313        self.drops.load(Ordering::Relaxed)
314    }
315    fn backpressure_strategy(&self) -> BackpressureStrategy {
316        self.strategy.clone()
317    }
318}
319
320impl Drop for JsonLinesAuditSink {
321    /// SP-observability-completeness-v1 #4 — close the channel, then join the
322    /// drain thread so the buffered tail flushes to the writer before the
323    /// sink goes away. Without this, a short-lived process (or a Block
324    /// "no-loss" adopter) could lose queued events at teardown. Joining can
325    /// block if the writer is wedged — that is the price of the no-loss
326    /// guarantee at shutdown.
327    fn drop(&mut self) {
328        self.tx.take(); // drop sender → drain loop ends after draining
329        if let Some(h) = self.drain.take() {
330            let _ = h.join();
331        }
332    }
333}
334
335/// Produce an RFC 3339 UTC timestamp string suitable for `CallEvent::ts`.
336/// Dispatch sites use this rather than calling chrono directly so the
337/// format stays consistent.
338pub fn now_rfc3339() -> String {
339    Utc::now().to_rfc3339()
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use std::sync::Mutex;
346
347    fn mk_event(outcome: Outcome) -> CallEvent {
348        CallEvent {
349            ts: now_rfc3339(),
350            call_id: "01J000000000000000000000TEST".into(),
351            tool_id: "ref:echo.say".into(),
352            caller_id: Some("test-client".into()),
353            granted_capabilities: vec!["read".into(), "write".into()],
354            duration_ms: 17,
355            outcome,
356            tier: "warm".into(),
357            dry_run: false,
358            schema_version: SCHEMA_VERSION,
359            secrets_resolved: false,
360            cursor_page: None,
361            capability_provenance: None,
362        }
363    }
364
365    #[test]
366    fn success_event_serializes() {
367        let e = mk_event(Outcome::Success);
368        let j: serde_json::Value =
369            serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
370        assert_eq!(j["tool_id"], "ref:echo.say");
371        assert_eq!(j["outcome"]["kind"], "success");
372        assert_eq!(j["schema_version"], 3);
373        assert_eq!(j["dry_run"], false);
374    }
375
376    #[test]
377    fn capability_denied_outcome_tagged_correctly() {
378        let e = mk_event(Outcome::CapabilityDenied {
379            missing: vec!["conformance.denied".into()],
380        });
381        let j: serde_json::Value =
382            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
383        assert_eq!(j["outcome"]["kind"], "capability_denied");
384        assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
385    }
386
387    #[test]
388    fn execution_failed_carries_code_and_retryable() {
389        let e = mk_event(Outcome::ExecutionFailed {
390            code: "FS_NOT_FOUND".into(),
391            retryable: false,
392        });
393        let j: serde_json::Value =
394            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
395        assert_eq!(j["outcome"]["kind"], "execution_failed");
396        assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
397        assert_eq!(j["outcome"]["retryable"], false);
398    }
399
400    #[test]
401    fn rate_limited_outcome_with_null_retry_after() {
402        let e = mk_event(Outcome::RateLimited {
403            retry_after_ms: None,
404        });
405        let j: serde_json::Value =
406            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
407        assert_eq!(j["outcome"]["kind"], "rate_limited");
408        assert!(j["outcome"]["retry_after_ms"].is_null());
409    }
410
411    // ---- SP-observability-completeness-v1 Axis C: capability provenance ----
412
413    #[test]
414    fn capability_provenance_roundtrips_both_sources() {
415        let mut e = mk_event(Outcome::Success);
416        e.capability_provenance = Some(vec![
417            CapProvenance {
418                cap: "records:read".into(),
419                source: ProvSource::StringAllowList,
420            },
421            CapProvenance {
422                cap: "records:write".into(),
423                source: ProvSource::UcanChain {
424                    issuer_did: "did:key:zABC".into(),
425                    chain_depth: 1,
426                },
427            },
428        ]);
429        let j: serde_json::Value =
430            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
431        let prov = j["capability_provenance"].as_array().unwrap();
432        assert_eq!(prov[0]["cap"], "records:read");
433        assert_eq!(prov[0]["source"]["kind"], "string_allow_list");
434        assert_eq!(prov[1]["source"]["kind"], "ucan_chain");
435        assert_eq!(prov[1]["source"]["issuer_did"], "did:key:zABC");
436        assert_eq!(prov[1]["source"]["chain_depth"], 1);
437    }
438
439    #[test]
440    fn provenance_skipped_when_none() {
441        let e = mk_event(Outcome::Success);
442        let s = serde_json::to_string(&e).unwrap();
443        assert!(
444            !s.contains("capability_provenance"),
445            "None provenance must be omitted on the wire (back-compat), got: {s}"
446        );
447    }
448
449    #[test]
450    fn v2_event_without_provenance_deserializes_to_none() {
451        // A v2 audit line (no capability_provenance field) must read into
452        // a v3 consumer as None — adopters on old atd builds keep working.
453        let j = r#"{"ts":"2026-05-29T00:00:00+00:00","call_id":"01J","tool_id":"x",
454            "granted_capabilities":[],"duration_ms":1,"outcome":{"kind":"success"},
455            "tier":"warm","dry_run":false,"schema_version":2,"secrets_resolved":false}"#;
456        let e: CallEvent = serde_json::from_str(j).unwrap();
457        assert!(e.capability_provenance.is_none());
458        assert!(e.cursor_page.is_none());
459    }
460
461    #[test]
462    fn caller_id_skipped_when_none() {
463        let mut e = mk_event(Outcome::Success);
464        e.caller_id = None;
465        let s = serde_json::to_string(&e).unwrap();
466        assert!(
467            !s.contains("caller_id"),
468            "caller_id None should be skipped, got: {}",
469            s
470        );
471    }
472
473    /// Shared in-memory buffer wrapped behind a `Write` impl. Used as the
474    /// sink's target so tests can inspect what got written without touching
475    /// the filesystem. Cloning the `Arc<Mutex<...>>` outside the box lets
476    /// the test read while the drain task writes.
477    struct SharedBuf(Arc<Mutex<Vec<u8>>>);
478    impl Write for SharedBuf {
479        fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
480            self.0.lock().unwrap().extend_from_slice(bs);
481            Ok(bs.len())
482        }
483        fn flush(&mut self) -> std::io::Result<()> {
484            Ok(())
485        }
486    }
487
488    /// Spin until the buffer accumulates `target_lines` newline-terminated
489    /// records or `timeout` elapses. Returns the buffer's accumulated bytes.
490    async fn wait_for_lines(
491        buf: &Arc<Mutex<Vec<u8>>>,
492        target_lines: usize,
493        timeout: std::time::Duration,
494    ) -> Vec<u8> {
495        let deadline = std::time::Instant::now() + timeout;
496        loop {
497            {
498                let guard = buf.lock().unwrap();
499                let count = guard.iter().filter(|b| **b == b'\n').count();
500                if count >= target_lines || std::time::Instant::now() > deadline {
501                    return guard.clone();
502                }
503            }
504            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
505        }
506    }
507
508    #[tokio::test]
509    async fn json_lines_sink_writes_one_line_per_event() {
510        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
511        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
512        sink.on_call(&mk_event(Outcome::Success));
513        sink.on_call(&mk_event(Outcome::ToolNotFound));
514
515        let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
516        let text = String::from_utf8(out).unwrap();
517        let lines: Vec<&str> = text.split_terminator('\n').collect();
518        assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
519        for line in &lines {
520            let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
521        }
522    }
523
524    // ---- SP-concurrency-baseline §5.4 mpsc rewrite tests ----
525
526    #[tokio::test]
527    async fn on_call_is_non_blocking_under_burst() {
528        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
529        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
530        let ev = mk_event(Outcome::Success);
531        // 100 synchronous on_call invocations on the dispatch hot path must
532        // complete in well under 10ms total — sub-millisecond per call once
533        // the channel is warm.
534        let started = std::time::Instant::now();
535        for _ in 0..100 {
536            sink.on_call(&ev);
537        }
538        let elapsed = started.elapsed();
539        assert!(
540            elapsed < std::time::Duration::from_millis(50),
541            "100 on_call invocations took {elapsed:?}; expected <50ms"
542        );
543    }
544
545    #[test]
546    fn drops_counter_increments_when_channel_full() {
547        // SlowBuf throttles the drain (2ms/write) so a 200-event Drop-strategy
548        // burst on a capacity-4 channel is GUARANTEED to saturate it before the
549        // drain catches up. Without the throttle the std-thread drain races the
550        // producer and could keep up on a fast/idle box → flaky `drops == 0`.
551        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
552        let sink = JsonLinesAuditSink::new_with_capacity(
553            Box::new(SlowBuf {
554                inner: buf,
555                delay: std::time::Duration::from_millis(2),
556            }),
557            4,
558        );
559        let ev = mk_event(Outcome::Success);
560        for _ in 0..200 {
561            sink.on_call(&ev);
562        }
563        assert!(
564            sink.drops() > 0,
565            "expected drops at capacity=4 with a 200-event burst against a slow drain, got 0"
566        );
567    }
568
569    #[tokio::test]
570    async fn events_eventually_drain_to_writer() {
571        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
572        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
573        let ev = mk_event(Outcome::Success);
574        for _ in 0..10 {
575            sink.on_call(&ev);
576        }
577        let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
578        let text = String::from_utf8(out).unwrap();
579        let lines: Vec<&str> = text.split_terminator('\n').collect();
580        assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
581    }
582
583    #[tokio::test]
584    async fn dropping_sink_drains_pending_then_exits() {
585        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
586        {
587            let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
588            for _ in 0..5 {
589                sink.on_call(&mk_event(Outcome::Success));
590            }
591            // Drop sink at end of block → tx closes → drain task finishes.
592        }
593        // Give the drain task time to consume the remaining queue and exit.
594        let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
595        let lines: Vec<&str> = std::str::from_utf8(&out)
596            .unwrap()
597            .split_terminator('\n')
598            .collect();
599        assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
600    }
601
602    // ---- SP-observability-completeness-v1 Axis B: backpressure ----
603
604    /// A `Write` that sleeps per write — simulates a slow audit disk so a
605    /// burst outruns the drain and exercises the backpressure path.
606    struct SlowBuf {
607        inner: Arc<Mutex<Vec<u8>>>,
608        delay: std::time::Duration,
609    }
610    impl Write for SlowBuf {
611        fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
612            std::thread::sleep(self.delay);
613            self.inner.lock().unwrap().extend_from_slice(bs);
614            Ok(bs.len())
615        }
616        fn flush(&mut self) -> std::io::Result<()> {
617            Ok(())
618        }
619    }
620
621    #[test]
622    fn bare_sink_defaults_to_drop_strategy() {
623        struct Bare;
624        impl AuditSink for Bare {
625            fn on_call(&self, _: &CallEvent) {}
626        }
627        assert!(matches!(
628            Bare.backpressure_strategy(),
629            BackpressureStrategy::Drop
630        ));
631    }
632
633    #[test]
634    fn with_strategy_block_reports_block() {
635        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
636        let sink = JsonLinesAuditSink::with_strategy(
637            Box::new(SharedBuf(buf)),
638            16,
639            BackpressureStrategy::Block,
640        );
641        assert!(matches!(
642            sink.backpressure_strategy(),
643            BackpressureStrategy::Block
644        ));
645    }
646
647    #[test]
648    fn block_strategy_loses_nothing_under_burst() {
649        // Throttled writer + tiny capacity + Block: every event must land,
650        // zero drops. Block makes on_call wait for queue space; dropping the
651        // sink JOINS the drain thread (#4), flushing the tail — so the
652        // assertion observes the final state directly instead of racing a
653        // deadline/poll loop (the previous flaky shape).
654        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
655        let sink = JsonLinesAuditSink::with_strategy(
656            Box::new(SlowBuf {
657                inner: buf.clone(),
658                delay: std::time::Duration::from_micros(50),
659            }),
660            4,
661            BackpressureStrategy::Block,
662        );
663        let ev = mk_event(Outcome::Success);
664        for _ in 0..100 {
665            sink.on_call(&ev);
666        }
667        assert_eq!(sink.drops(), 0, "Block strategy must never drop");
668        drop(sink); // joins the drain thread → all buffered events flushed
669        let n = buf.lock().unwrap().iter().filter(|b| **b == b'\n').count();
670        assert_eq!(
671            n, 100,
672            "Block must flush all 100 events by the time drop returns"
673        );
674    }
675
676    #[test]
677    fn fallback_strategy_routes_overflow_to_fallback() {
678        struct CountSink(Arc<AtomicU64>);
679        impl AuditSink for CountSink {
680            fn on_call(&self, _: &CallEvent) {
681                self.0.fetch_add(1, Ordering::Relaxed);
682            }
683        }
684        let fb_count = Arc::new(AtomicU64::new(0));
685        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
686        let sink = JsonLinesAuditSink::with_strategy(
687            Box::new(SlowBuf {
688                inner: buf,
689                delay: std::time::Duration::from_millis(5),
690            }),
691            1,
692            BackpressureStrategy::FallbackSink(Arc::new(CountSink(fb_count.clone()))),
693        );
694        let ev = mk_event(Outcome::Success);
695        for _ in 0..50 {
696            sink.on_call(&ev);
697        }
698        assert_eq!(sink.drops(), 0, "fallback caught overflow; primary drops 0");
699        assert!(
700            fb_count.load(Ordering::Relaxed) > 0,
701            "fallback sink must catch the overflow events"
702        );
703    }
704
705    #[test]
706    fn now_rfc3339_format_is_parseable() {
707        let s = now_rfc3339();
708        chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
709    }
710}