Skip to main content

atd_runtime/
audit.rs

1//! Structured per-call audit events + pluggable sinks.
2//!
3//! `AuditSink` is the observation hook called at dispatch return points.
4//! It sits OUTSIDE `Middleware` (a wire-reply rewriter) because audit
5//! observes *metadata* about every outcome — including failures — and never
6//! carries the result/error body (no PHI exit). `Middleware` rewrites the
7//! body the LLM sees (`on_result` for success / `ExecutionFailed`,
8//! `on_error` for `Response::Error`); `AuditSink` records who/what/when.
9//!
10//! `JsonLinesAuditSink` writes one JSON object per line via a dedicated
11//! **std thread** drain over a bounded `std::sync::mpsc::sync_channel`.
12//! SP-concurrency-baseline §5.4 introduced the queue to decouple the
13//! dispatch hot path from synchronous file I/O; SP-observability-
14//! completeness-v1 Axis B made the queue-full policy selectable
15//! ([`BackpressureStrategy`]: `Drop` default / `Block` / `FallbackSink`)
16//! and moved the drain to a std thread, so construction no longer requires
17//! a tokio runtime context.
18
19use chrono::Utc;
20use serde::{Deserialize, Serialize};
21use std::io::Write;
22use std::path::Path;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicU64, Ordering};
25
26/// Audit schema version. Consumers should branch on this if future
27/// breaking changes land.
28///
29/// - v1 (SP-operability-v1) — initial stable schema.
30/// - v2 (SP-pagination-v1) — adds optional `cursor_page` field. The field
31///   is `#[serde(default, skip_serializing_if = "Option::is_none")]` so
32///   v1 consumers tolerate v2 events; v2 consumers reading v1 events see
33///   `cursor_page: None`. The version bump records when the field landed,
34///   not a breaking shape change.
35/// - v3 (SP-observability-completeness-v1) — adds optional
36///   `capability_provenance`. Same additive-optional rule: v2 readers
37///   tolerate v3 events (ignore the field); v3 readers see `None` on v2
38///   events. Records per-capability source so an operator can answer
39///   "why did caller X have capability Y?" without re-deriving the chain.
40pub const SCHEMA_VERSION: u32 = 3;
41
42/// One per-call audit event. Emitted at every `Request::RunTool`
43/// return point (success, invalid_args, execution_failed, cap_denied,
44/// rate_limited, tool_not_found). Ping / Hello / ToolList / ToolSchema
45/// do NOT emit events in v1.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47#[non_exhaustive]
48pub struct CallEvent {
49    pub ts: String,
50    pub call_id: String,
51    pub tool_id: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub caller_id: Option<String>,
54    pub granted_capabilities: Vec<String>,
55    pub duration_ms: u64,
56    pub outcome: Outcome,
57    pub tier: String,
58    pub dry_run: bool,
59    pub schema_version: u32,
60    /// `true` iff a `TokenBroker` was configured AND it returned
61    /// `Ok(Some(_))` for this caller (SP-token-broker-phase1). Always
62    /// `false` for early-return paths (capability denied, dry-run,
63    /// rate-limited, tool-not-found) and for servers without a broker.
64    /// No key names or values are recorded.
65    #[serde(default)]
66    pub secrets_resolved: bool,
67    /// SP-pagination-v1 — 1-based page index for paginated calls. `None`
68    /// for non-paginated dispatches (the vast majority of events; saves
69    /// bytes in the audit log). `Some(1)` for the initial `RunTool` that
70    /// returned a cursor; `Some(2..)` for each `RunToolContinue`.
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub cursor_page: Option<u32>,
73    /// SP-observability-completeness-v1 Axis C — per-capability source
74    /// attribution. `None` when provenance wasn't tracked (back-compat,
75    /// early-return paths with no capability context); `Some(vec)` when
76    /// dispatch recorded which mechanism granted each capability. Lets an
77    /// operator trace each granted capability to the operator string
78    /// allow-list or a specific UCAN chain link.
79    #[serde(default, skip_serializing_if = "Option::is_none")]
80    pub capability_provenance: Option<Vec<CapProvenance>>,
81}
82
83impl CallEvent {
84    /// Stable constructor. `CallEvent` is `#[non_exhaustive]`: external crates
85    /// — adopters that emit their own audit events through an `AuditSink`
86    /// (e.g. celia's federation orchestrator) — MUST build it via `new` + the
87    /// `with_*` setters rather than a struct literal, so that adding an audit
88    /// field in a future minor is **not** a breaking change for them (the
89    /// `capability_provenance`/`cursor_page` additions that broke struct-literal
90    /// constructors are the motivation). Required fields are constructor args;
91    /// everything else defaults — `caller_id=None`, `granted_capabilities=[]`,
92    /// `dry_run=false`, `secrets_resolved=false`, `cursor_page=None`,
93    /// `capability_provenance=None`, `schema_version=SCHEMA_VERSION`.
94    pub fn new(
95        ts: impl Into<String>,
96        call_id: impl Into<String>,
97        tool_id: impl Into<String>,
98        duration_ms: u64,
99        outcome: Outcome,
100        tier: impl Into<String>,
101    ) -> Self {
102        Self {
103            ts: ts.into(),
104            call_id: call_id.into(),
105            tool_id: tool_id.into(),
106            caller_id: None,
107            granted_capabilities: Vec::new(),
108            duration_ms,
109            outcome,
110            tier: tier.into(),
111            dry_run: false,
112            schema_version: SCHEMA_VERSION,
113            secrets_resolved: false,
114            cursor_page: None,
115            capability_provenance: None,
116        }
117    }
118
119    pub fn with_caller_id(mut self, caller_id: Option<String>) -> Self {
120        self.caller_id = caller_id;
121        self
122    }
123    pub fn with_granted_capabilities(mut self, caps: Vec<String>) -> Self {
124        self.granted_capabilities = caps;
125        self
126    }
127    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
128        self.dry_run = dry_run;
129        self
130    }
131    pub fn with_secrets_resolved(mut self, resolved: bool) -> Self {
132        self.secrets_resolved = resolved;
133        self
134    }
135    pub fn with_cursor_page(mut self, page: Option<u32>) -> Self {
136        self.cursor_page = page;
137        self
138    }
139    pub fn with_capability_provenance(mut self, provenance: Option<Vec<CapProvenance>>) -> Self {
140        self.capability_provenance = provenance;
141        self
142    }
143}
144
145/// SP-observability-completeness-v1 Axis C — one capability + how it was
146/// granted. The `granted_capabilities` field on `CallEvent` records the
147/// *result* set; this records the *source* of each.
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
149pub struct CapProvenance {
150    pub cap: String,
151    pub source: ProvSource,
152}
153
154/// Where a granted capability came from (architecture §5.2 — the two
155/// composing mechanisms whose union forms `granted_capabilities`).
156#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(tag = "kind", rename_all = "snake_case")]
158pub enum ProvSource {
159    /// Granted by the operator string allow-list
160    /// (`--grant-capability` ∩ `Hello.requested_capabilities`).
161    StringAllowList,
162    /// Granted by a UCAN-lite chain link. `issuer_did` is the link's
163    /// `iss`; `chain_depth` is its position (0 = root).
164    UcanChain { issuer_did: String, chain_depth: u8 },
165}
166
167/// Outcome variants cover the full dispatch-return space for RunTool.
168#[derive(Debug, Clone, Serialize, Deserialize)]
169#[serde(tag = "kind", rename_all = "snake_case")]
170pub enum Outcome {
171    Success,
172    ExecutionFailed { code: String, retryable: bool },
173    InvalidArgs { message: String },
174    CapabilityDenied { missing: Vec<String> },
175    RateLimited { retry_after_ms: Option<u64> },
176    ToolNotFound,
177}
178
179/// SP-observability-completeness-v1 Axis B. How a sink behaves when its
180/// internal queue is full at `on_call` time.
181#[derive(Clone)]
182pub enum BackpressureStrategy {
183    /// Drop the event, increment `drops()`. The SP-concurrency-baseline
184    /// default — protects dispatch throughput; correct for the 90%
185    /// non-compliance case. "log loss >> dispatch stall."
186    Drop,
187    /// Block the dispatch path until the queue accepts the event. For
188    /// compliance adopters (HIPAA §164.528) where a dropped audit record is
189    /// unacceptable: dispatch slows under audit backpressure rather than
190    /// losing the disclosure record. Requires a multi-thread runtime (the
191    /// ref binaries use one) so a blocked worker doesn't starve accept.
192    Block,
193    /// On queue-full, write the event synchronously to a fallback sink
194    /// (e.g. stderr / a second file) instead of dropping. Bounds the hot
195    /// path (no indefinite block) with no silent loss. The fallback SHOULD
196    /// be a synchronous sink, never another queueing sink (avoid chained
197    /// blocking).
198    FallbackSink(Arc<dyn AuditSink>),
199}
200
201impl std::fmt::Debug for BackpressureStrategy {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        match self {
204            BackpressureStrategy::Drop => f.write_str("Drop"),
205            BackpressureStrategy::Block => f.write_str("Block"),
206            BackpressureStrategy::FallbackSink(_) => f.write_str("FallbackSink(..)"),
207        }
208    }
209}
210
211/// Observer hook. `on_call` is invoked synchronously on the dispatch path;
212/// its behaviour under queue pressure is the sink's
213/// [`backpressure_strategy`](AuditSink::backpressure_strategy). Must not panic.
214pub trait AuditSink: Send + Sync {
215    fn on_call(&self, event: &CallEvent);
216    /// Total events dropped because the sink's queue was full. Default `0`
217    /// for sinks that don't queue (custom synchronous adopter impls).
218    /// `JsonLinesAuditSink` overrides with its `Arc<AtomicU64>` counter so
219    /// `Server::metrics_snapshot()` (SP-concurrency-baseline §5.7) can
220    /// surface the count without coupling the metrics module to the
221    /// concrete sink type.
222    fn drops(&self) -> u64 {
223        0
224    }
225    /// SP-observability-completeness-v1 Axis B — the sink's queue-full
226    /// policy. Default `Drop` (byte-compatible with pre-SP sinks); medical /
227    /// compliance adopters override (or construct `JsonLinesAuditSink` via
228    /// `with_strategy`) to get `Block` / `FallbackSink`.
229    fn backpressure_strategy(&self) -> BackpressureStrategy {
230        BackpressureStrategy::Drop
231    }
232}
233
234/// Default channel capacity. 1024 events × ~500 bytes ≈ 512 KB peak buffer;
235/// drains at the rate the wrapped writer can absorb (typical disk write
236/// rate: 10k events/s sustained, transient bursts much higher).
237pub const DEFAULT_AUDIT_QUEUE_CAPACITY: usize = 1024;
238
239/// SP-concurrency-baseline §5.4 + SP-observability-completeness-v1 Axis B.
240/// Writes one JSON object per line to the wrapped writer via a dedicated
241/// **std thread** drain. Behaviour when the bounded channel is full is
242/// selectable via [`BackpressureStrategy`]:
243///
244/// - `Drop` (default) — `try_send`; on full, drop + bump `drops()`. The
245///   SP-concurrency-baseline behaviour (log loss >> dispatch stall).
246/// - `Block` — blocking `send`; dispatch slows rather than losing an event
247///   (HIPAA §164.528 no-loss audit). The drain is a dedicated OS thread, so
248///   blocking the sync `on_call` blocks the calling thread, not an async
249///   reactor primitive — use a multi-thread runtime (ref binaries do).
250/// - `FallbackSink(fb)` — on full, synchronously write to `fb`.
251///
252/// Construction no longer requires a tokio runtime context (the drain is a
253/// plain `std::thread`), unlike the prior tokio-mpsc implementation.
254pub struct JsonLinesAuditSink {
255    /// `Option` so `Drop` can take + drop the sender (closing the channel)
256    /// before joining the drain thread.
257    tx: Option<std::sync::mpsc::SyncSender<CallEvent>>,
258    /// Retained so `Drop` can join the drain thread and guarantee the
259    /// buffered tail flushes before the sink goes away (#4 no-loss-at-drop).
260    drain: Option<std::thread::JoinHandle<()>>,
261    drops: Arc<AtomicU64>,
262    strategy: BackpressureStrategy,
263}
264
265impl JsonLinesAuditSink {
266    /// Construct with default capacity (`DEFAULT_AUDIT_QUEUE_CAPACITY`) and
267    /// the `Drop` strategy (pre-SP behaviour).
268    pub fn new(writer: Box<dyn Write + Send + 'static>) -> Self {
269        Self::new_with_capacity(writer, DEFAULT_AUDIT_QUEUE_CAPACITY)
270    }
271
272    /// Construct with an explicit channel capacity, `Drop` strategy.
273    pub fn new_with_capacity(writer: Box<dyn Write + Send + 'static>, capacity: usize) -> Self {
274        Self::with_strategy(writer, capacity, BackpressureStrategy::Drop)
275    }
276
277    /// SP-observability-completeness-v1 Axis B — construct with an explicit
278    /// backpressure strategy. Spawns a std thread that owns the writer and
279    /// drains the channel.
280    pub fn with_strategy(
281        writer: Box<dyn Write + Send + 'static>,
282        capacity: usize,
283        strategy: BackpressureStrategy,
284    ) -> Self {
285        // SP-observability-completeness-v1 #3 — Block blocks the calling
286        // (sync) thread under backpressure. On a current_thread tokio runtime
287        // that thread is the sole worker, so a stall starves accept. Warn
288        // (best-effort) when we can detect that flavor at construction time.
289        if matches!(strategy, BackpressureStrategy::Block) {
290            if let Ok(h) = tokio::runtime::Handle::try_current() {
291                if h.runtime_flavor() == tokio::runtime::RuntimeFlavor::CurrentThread {
292                    eprintln!(
293                        "atd: WARNING — JsonLinesAuditSink Block strategy on a \
294                         current_thread runtime; a blocked worker can stall accept \
295                         under audit backpressure. Prefer a multi-thread runtime."
296                    );
297                }
298            }
299        }
300        let (tx, rx) = std::sync::mpsc::sync_channel::<CallEvent>(capacity);
301        let drops = Arc::new(AtomicU64::new(0));
302        let mut writer = writer;
303        let drain = std::thread::spawn(move || {
304            while let Ok(ev) = rx.recv() {
305                if let Ok(mut line) = serde_json::to_vec(&ev) {
306                    line.push(b'\n');
307                    let _ = writer.write_all(&line);
308                    let _ = writer.flush();
309                }
310            }
311            // All senders dropped + queue drained — final flush.
312            let _ = writer.flush();
313        });
314        Self {
315            tx: Some(tx),
316            drain: Some(drain),
317            drops,
318            strategy,
319        }
320    }
321
322    pub fn stdout() -> Self {
323        Self::new(Box::new(std::io::stdout()))
324    }
325
326    pub fn stderr() -> Self {
327        Self::new(Box::new(std::io::stderr()))
328    }
329
330    /// Open `path` for append; creates the file if missing.
331    pub fn file(path: &Path) -> std::io::Result<Self> {
332        let f = std::fs::OpenOptions::new()
333            .create(true)
334            .append(true)
335            .open(path)?;
336        Ok(Self::new(Box::new(f)))
337    }
338
339    /// Count of events dropped because the channel was full when
340    /// `on_call` was invoked (or the drain thread had exited).
341    pub fn drops(&self) -> u64 {
342        self.drops.load(Ordering::Relaxed)
343    }
344}
345
346impl AuditSink for JsonLinesAuditSink {
347    fn on_call(&self, event: &CallEvent) {
348        let Some(tx) = self.tx.as_ref() else {
349            // Sink is being torn down (tx taken in Drop); count as a drop.
350            self.drops.fetch_add(1, Ordering::Relaxed);
351            return;
352        };
353        match &self.strategy {
354            BackpressureStrategy::Drop => {
355                // Non-blocking; on full, drop + count. log loss >> stall.
356                if tx.try_send(event.clone()).is_err() {
357                    self.drops.fetch_add(1, Ordering::Relaxed);
358                }
359            }
360            BackpressureStrategy::Block => {
361                // Blocking send — no event lost; dispatch slows under
362                // backpressure. Err only if the drain thread is gone
363                // (shutdown), in which case count a drop rather than hang.
364                if tx.send(event.clone()).is_err() {
365                    self.drops.fetch_add(1, Ordering::Relaxed);
366                }
367            }
368            BackpressureStrategy::FallbackSink(fb) => {
369                if tx.try_send(event.clone()).is_err() {
370                    fb.on_call(event);
371                }
372            }
373        }
374    }
375    fn drops(&self) -> u64 {
376        self.drops.load(Ordering::Relaxed)
377    }
378    fn backpressure_strategy(&self) -> BackpressureStrategy {
379        self.strategy.clone()
380    }
381}
382
383impl Drop for JsonLinesAuditSink {
384    /// SP-observability-completeness-v1 #4 — close the channel, then join the
385    /// drain thread so the buffered tail flushes to the writer before the
386    /// sink goes away. Without this, a short-lived process (or a Block
387    /// "no-loss" adopter) could lose queued events at teardown. Joining can
388    /// block if the writer is wedged — that is the price of the no-loss
389    /// guarantee at shutdown.
390    fn drop(&mut self) {
391        self.tx.take(); // drop sender → drain loop ends after draining
392        if let Some(h) = self.drain.take() {
393            let _ = h.join();
394        }
395    }
396}
397
398/// Produce an RFC 3339 UTC timestamp string suitable for `CallEvent::ts`.
399/// Dispatch sites use this rather than calling chrono directly so the
400/// format stays consistent.
401pub fn now_rfc3339() -> String {
402    Utc::now().to_rfc3339()
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408    use std::sync::Mutex;
409
410    fn mk_event(outcome: Outcome) -> CallEvent {
411        CallEvent::new(
412            now_rfc3339(),
413            "01J000000000000000000000TEST",
414            "ref:echo.say",
415            17,
416            outcome,
417            "warm",
418        )
419        .with_caller_id(Some("test-client".into()))
420        .with_granted_capabilities(vec!["read".into(), "write".into()])
421    }
422
423    #[test]
424    fn callevent_builder_defaults_then_setters() {
425        let e = CallEvent::new(now_rfc3339(), "cid", "tool:x", 5, Outcome::Success, "warm");
426        assert_eq!(e.tool_id, "tool:x");
427        assert_eq!(e.duration_ms, 5);
428        assert_eq!(e.schema_version, SCHEMA_VERSION);
429        assert!(e.caller_id.is_none());
430        assert!(e.granted_capabilities.is_empty());
431        assert!(!e.dry_run);
432        assert!(!e.secrets_resolved);
433        assert!(e.cursor_page.is_none());
434        assert!(e.capability_provenance.is_none());
435        let e2 = e
436            .with_caller_id(Some("agent-A".into()))
437            .with_cursor_page(Some(2))
438            .with_secrets_resolved(true);
439        assert_eq!(e2.caller_id.as_deref(), Some("agent-A"));
440        assert_eq!(e2.cursor_page, Some(2));
441        assert!(e2.secrets_resolved);
442    }
443
444    #[test]
445    fn success_event_serializes() {
446        let e = mk_event(Outcome::Success);
447        let j: serde_json::Value =
448            serde_json::from_slice(&serde_json::to_vec(&e).expect("serialize")).expect("parse");
449        assert_eq!(j["tool_id"], "ref:echo.say");
450        assert_eq!(j["outcome"]["kind"], "success");
451        assert_eq!(j["schema_version"], 3);
452        assert_eq!(j["dry_run"], false);
453    }
454
455    #[test]
456    fn capability_denied_outcome_tagged_correctly() {
457        let e = mk_event(Outcome::CapabilityDenied {
458            missing: vec!["conformance.denied".into()],
459        });
460        let j: serde_json::Value =
461            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
462        assert_eq!(j["outcome"]["kind"], "capability_denied");
463        assert_eq!(j["outcome"]["missing"][0], "conformance.denied");
464    }
465
466    #[test]
467    fn execution_failed_carries_code_and_retryable() {
468        let e = mk_event(Outcome::ExecutionFailed {
469            code: "FS_NOT_FOUND".into(),
470            retryable: false,
471        });
472        let j: serde_json::Value =
473            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
474        assert_eq!(j["outcome"]["kind"], "execution_failed");
475        assert_eq!(j["outcome"]["code"], "FS_NOT_FOUND");
476        assert_eq!(j["outcome"]["retryable"], false);
477    }
478
479    #[test]
480    fn rate_limited_outcome_with_null_retry_after() {
481        let e = mk_event(Outcome::RateLimited {
482            retry_after_ms: None,
483        });
484        let j: serde_json::Value =
485            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
486        assert_eq!(j["outcome"]["kind"], "rate_limited");
487        assert!(j["outcome"]["retry_after_ms"].is_null());
488    }
489
490    // ---- SP-observability-completeness-v1 Axis C: capability provenance ----
491
492    #[test]
493    fn capability_provenance_roundtrips_both_sources() {
494        let mut e = mk_event(Outcome::Success);
495        e.capability_provenance = Some(vec![
496            CapProvenance {
497                cap: "records:read".into(),
498                source: ProvSource::StringAllowList,
499            },
500            CapProvenance {
501                cap: "records:write".into(),
502                source: ProvSource::UcanChain {
503                    issuer_did: "did:key:zABC".into(),
504                    chain_depth: 1,
505                },
506            },
507        ]);
508        let j: serde_json::Value =
509            serde_json::from_slice(&serde_json::to_vec(&e).unwrap()).unwrap();
510        let prov = j["capability_provenance"].as_array().unwrap();
511        assert_eq!(prov[0]["cap"], "records:read");
512        assert_eq!(prov[0]["source"]["kind"], "string_allow_list");
513        assert_eq!(prov[1]["source"]["kind"], "ucan_chain");
514        assert_eq!(prov[1]["source"]["issuer_did"], "did:key:zABC");
515        assert_eq!(prov[1]["source"]["chain_depth"], 1);
516    }
517
518    #[test]
519    fn provenance_skipped_when_none() {
520        let e = mk_event(Outcome::Success);
521        let s = serde_json::to_string(&e).unwrap();
522        assert!(
523            !s.contains("capability_provenance"),
524            "None provenance must be omitted on the wire (back-compat), got: {s}"
525        );
526    }
527
528    #[test]
529    fn v2_event_without_provenance_deserializes_to_none() {
530        // A v2 audit line (no capability_provenance field) must read into
531        // a v3 consumer as None — adopters on old atd builds keep working.
532        let j = r#"{"ts":"2026-05-29T00:00:00+00:00","call_id":"01J","tool_id":"x",
533            "granted_capabilities":[],"duration_ms":1,"outcome":{"kind":"success"},
534            "tier":"warm","dry_run":false,"schema_version":2,"secrets_resolved":false}"#;
535        let e: CallEvent = serde_json::from_str(j).unwrap();
536        assert!(e.capability_provenance.is_none());
537        assert!(e.cursor_page.is_none());
538    }
539
540    #[test]
541    fn caller_id_skipped_when_none() {
542        let mut e = mk_event(Outcome::Success);
543        e.caller_id = None;
544        let s = serde_json::to_string(&e).unwrap();
545        assert!(
546            !s.contains("caller_id"),
547            "caller_id None should be skipped, got: {}",
548            s
549        );
550    }
551
552    /// Shared in-memory buffer wrapped behind a `Write` impl. Used as the
553    /// sink's target so tests can inspect what got written without touching
554    /// the filesystem. Cloning the `Arc<Mutex<...>>` outside the box lets
555    /// the test read while the drain task writes.
556    struct SharedBuf(Arc<Mutex<Vec<u8>>>);
557    impl Write for SharedBuf {
558        fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
559            self.0.lock().unwrap().extend_from_slice(bs);
560            Ok(bs.len())
561        }
562        fn flush(&mut self) -> std::io::Result<()> {
563            Ok(())
564        }
565    }
566
567    /// Spin until the buffer accumulates `target_lines` newline-terminated
568    /// records or `timeout` elapses. Returns the buffer's accumulated bytes.
569    async fn wait_for_lines(
570        buf: &Arc<Mutex<Vec<u8>>>,
571        target_lines: usize,
572        timeout: std::time::Duration,
573    ) -> Vec<u8> {
574        let deadline = std::time::Instant::now() + timeout;
575        loop {
576            {
577                let guard = buf.lock().unwrap();
578                let count = guard.iter().filter(|b| **b == b'\n').count();
579                if count >= target_lines || std::time::Instant::now() > deadline {
580                    return guard.clone();
581                }
582            }
583            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
584        }
585    }
586
587    #[tokio::test]
588    async fn json_lines_sink_writes_one_line_per_event() {
589        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
590        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
591        sink.on_call(&mk_event(Outcome::Success));
592        sink.on_call(&mk_event(Outcome::ToolNotFound));
593
594        let out = wait_for_lines(&buf, 2, std::time::Duration::from_millis(500)).await;
595        let text = String::from_utf8(out).unwrap();
596        let lines: Vec<&str> = text.split_terminator('\n').collect();
597        assert_eq!(lines.len(), 2, "expected 2 lines, got: {lines:?}");
598        for line in &lines {
599            let _: CallEvent = serde_json::from_str(line).expect("each line parses as CallEvent");
600        }
601    }
602
603    // ---- SP-concurrency-baseline §5.4 mpsc rewrite tests ----
604
605    #[tokio::test]
606    async fn on_call_is_non_blocking_under_burst() {
607        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
608        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf)));
609        let ev = mk_event(Outcome::Success);
610        // 100 synchronous on_call invocations on the dispatch hot path must
611        // complete in well under 10ms total — sub-millisecond per call once
612        // the channel is warm.
613        let started = std::time::Instant::now();
614        for _ in 0..100 {
615            sink.on_call(&ev);
616        }
617        let elapsed = started.elapsed();
618        assert!(
619            elapsed < std::time::Duration::from_millis(50),
620            "100 on_call invocations took {elapsed:?}; expected <50ms"
621        );
622    }
623
624    #[test]
625    fn drops_counter_increments_when_channel_full() {
626        // SlowBuf throttles the drain (2ms/write) so a 200-event Drop-strategy
627        // burst on a capacity-4 channel is GUARANTEED to saturate it before the
628        // drain catches up. Without the throttle the std-thread drain races the
629        // producer and could keep up on a fast/idle box → flaky `drops == 0`.
630        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
631        let sink = JsonLinesAuditSink::new_with_capacity(
632            Box::new(SlowBuf {
633                inner: buf,
634                delay: std::time::Duration::from_millis(2),
635            }),
636            4,
637        );
638        let ev = mk_event(Outcome::Success);
639        for _ in 0..200 {
640            sink.on_call(&ev);
641        }
642        assert!(
643            sink.drops() > 0,
644            "expected drops at capacity=4 with a 200-event burst against a slow drain, got 0"
645        );
646    }
647
648    #[tokio::test]
649    async fn events_eventually_drain_to_writer() {
650        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
651        let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
652        let ev = mk_event(Outcome::Success);
653        for _ in 0..10 {
654            sink.on_call(&ev);
655        }
656        let out = wait_for_lines(&buf, 10, std::time::Duration::from_millis(500)).await;
657        let text = String::from_utf8(out).unwrap();
658        let lines: Vec<&str> = text.split_terminator('\n').collect();
659        assert_eq!(lines.len(), 10, "expected 10 lines, got {}", lines.len());
660    }
661
662    #[tokio::test]
663    async fn dropping_sink_drains_pending_then_exits() {
664        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
665        {
666            let sink = JsonLinesAuditSink::new(Box::new(SharedBuf(buf.clone())));
667            for _ in 0..5 {
668                sink.on_call(&mk_event(Outcome::Success));
669            }
670            // Drop sink at end of block → tx closes → drain task finishes.
671        }
672        // Give the drain task time to consume the remaining queue and exit.
673        let out = wait_for_lines(&buf, 5, std::time::Duration::from_millis(500)).await;
674        let lines: Vec<&str> = std::str::from_utf8(&out)
675            .unwrap()
676            .split_terminator('\n')
677            .collect();
678        assert_eq!(lines.len(), 5, "drop should flush the last 5 events");
679    }
680
681    // ---- SP-observability-completeness-v1 Axis B: backpressure ----
682
683    /// A `Write` that sleeps per write — simulates a slow audit disk so a
684    /// burst outruns the drain and exercises the backpressure path.
685    struct SlowBuf {
686        inner: Arc<Mutex<Vec<u8>>>,
687        delay: std::time::Duration,
688    }
689    impl Write for SlowBuf {
690        fn write(&mut self, bs: &[u8]) -> std::io::Result<usize> {
691            std::thread::sleep(self.delay);
692            self.inner.lock().unwrap().extend_from_slice(bs);
693            Ok(bs.len())
694        }
695        fn flush(&mut self) -> std::io::Result<()> {
696            Ok(())
697        }
698    }
699
700    #[test]
701    fn bare_sink_defaults_to_drop_strategy() {
702        struct Bare;
703        impl AuditSink for Bare {
704            fn on_call(&self, _: &CallEvent) {}
705        }
706        assert!(matches!(
707            Bare.backpressure_strategy(),
708            BackpressureStrategy::Drop
709        ));
710    }
711
712    #[test]
713    fn with_strategy_block_reports_block() {
714        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
715        let sink = JsonLinesAuditSink::with_strategy(
716            Box::new(SharedBuf(buf)),
717            16,
718            BackpressureStrategy::Block,
719        );
720        assert!(matches!(
721            sink.backpressure_strategy(),
722            BackpressureStrategy::Block
723        ));
724    }
725
726    #[test]
727    fn block_strategy_loses_nothing_under_burst() {
728        // Throttled writer + tiny capacity + Block: every event must land,
729        // zero drops. Block makes on_call wait for queue space; dropping the
730        // sink JOINS the drain thread (#4), flushing the tail — so the
731        // assertion observes the final state directly instead of racing a
732        // deadline/poll loop (the previous flaky shape).
733        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
734        let sink = JsonLinesAuditSink::with_strategy(
735            Box::new(SlowBuf {
736                inner: buf.clone(),
737                delay: std::time::Duration::from_micros(50),
738            }),
739            4,
740            BackpressureStrategy::Block,
741        );
742        let ev = mk_event(Outcome::Success);
743        for _ in 0..100 {
744            sink.on_call(&ev);
745        }
746        assert_eq!(sink.drops(), 0, "Block strategy must never drop");
747        drop(sink); // joins the drain thread → all buffered events flushed
748        let n = buf.lock().unwrap().iter().filter(|b| **b == b'\n').count();
749        assert_eq!(
750            n, 100,
751            "Block must flush all 100 events by the time drop returns"
752        );
753    }
754
755    #[test]
756    fn fallback_strategy_routes_overflow_to_fallback() {
757        struct CountSink(Arc<AtomicU64>);
758        impl AuditSink for CountSink {
759            fn on_call(&self, _: &CallEvent) {
760                self.0.fetch_add(1, Ordering::Relaxed);
761            }
762        }
763        let fb_count = Arc::new(AtomicU64::new(0));
764        let buf = Arc::new(Mutex::new(Vec::<u8>::new()));
765        let sink = JsonLinesAuditSink::with_strategy(
766            Box::new(SlowBuf {
767                inner: buf,
768                delay: std::time::Duration::from_millis(5),
769            }),
770            1,
771            BackpressureStrategy::FallbackSink(Arc::new(CountSink(fb_count.clone()))),
772        );
773        let ev = mk_event(Outcome::Success);
774        for _ in 0..50 {
775            sink.on_call(&ev);
776        }
777        assert_eq!(sink.drops(), 0, "fallback caught overflow; primary drops 0");
778        assert!(
779            fb_count.load(Ordering::Relaxed) > 0,
780            "fallback sink must catch the overflow events"
781        );
782    }
783
784    #[test]
785    fn now_rfc3339_format_is_parseable() {
786        let s = now_rfc3339();
787        chrono::DateTime::parse_from_rfc3339(&s).expect("RFC 3339 parseable");
788    }
789}