Skip to main content

arbiter_audit/
sink.rs

1//! Audit output sinks: structured JSON lines to stdout and file.
2
3use std::path::PathBuf;
4use std::sync::atomic::{AtomicU64, Ordering};
5
6use thiserror::Error;
7use tokio::fs::OpenOptions;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::entry::AuditEntry;
12
13/// Errors from writing audit entries.
14#[derive(Debug, Error)]
15pub enum SinkError {
16    #[error("JSON serialization failed: {0}")]
17    Serialize(#[from] serde_json::Error),
18
19    #[error("file I/O failed: {0}")]
20    Io(#[from] std::io::Error),
21}
22
23/// Configuration for the audit sink.
24#[derive(Debug, Clone)]
25pub struct AuditSinkConfig {
26    /// Write JSON lines to stdout (12-factor compatible).
27    pub write_stdout: bool,
28
29    /// Optional path to an append-only audit log file.
30    pub file_path: Option<PathBuf>,
31
32    /// Maximum audit log file size in bytes before emitting warnings.
33    /// Default: 100 MB. The sink emits tracing::warn when the file
34    /// exceeds this size so operators can set up external log rotation.
35    pub max_file_size_bytes: u64,
36
37    /// Emit BLAKE3 hash-chained records (`chain_sequence`, `chain_prev_hash`,
38    /// `chain_record_hash`) on each entry for tamper detection. Default: true.
39    pub hash_chain: bool,
40}
41
42/// Default max audit file size: 100 MB.
43const DEFAULT_MAX_AUDIT_FILE_SIZE: u64 = 100 * 1024 * 1024;
44
45impl Default for AuditSinkConfig {
46    fn default() -> Self {
47        Self {
48            write_stdout: true,
49            file_path: None,
50            max_file_size_bytes: DEFAULT_MAX_AUDIT_FILE_SIZE,
51            hash_chain: true,
52        }
53    }
54}
55
56/// Writes audit entries to configured outputs.
57///
58/// Tracks write failures via an atomic counter. When the file sink
59/// fails (disk full, permissions), the proxy can surface this via
60/// `X-Arbiter-Audit-Degraded` response headers.
61/// Hash chain state for tamper detection.
62struct ChainState {
63    /// Monotonic sequence counter.
64    sequence: u64,
65    /// Hash of the previous entry (hex-encoded).
66    prev_hash: String,
67}
68
69pub struct AuditSink {
70    config: AuditSinkConfig,
71    stats: crate::stats::AuditStats,
72    /// Consecutive write failures. Reset to 0 on each successful write.
73    write_failures: AtomicU64,
74    /// Total write failures since startup.
75    total_write_failures: AtomicU64,
76    /// Consecutive successes since last failure. Used for hysteresis:
77    /// the sink must succeed N times before transitioning from degraded to healthy,
78    /// preventing rapid flapping when the underlying issue is intermittent.
79    recovery_successes: AtomicU64,
80    /// Hash chain state for tamper detection (sequence + prev hash).
81    chain: Mutex<ChainState>,
82    /// Persistent file handle, opened once at construction to avoid
83    /// the race window between open() and write() on each entry.
84    file: Option<Mutex<tokio::fs::File>>,
85}
86
87impl AuditSink {
88    /// Create a new audit sink with the given configuration.
89    pub fn new(config: AuditSinkConfig) -> Self {
90        Self {
91            config,
92            stats: crate::stats::AuditStats::new(),
93            write_failures: AtomicU64::new(0),
94            total_write_failures: AtomicU64::new(0),
95            recovery_successes: AtomicU64::new(0),
96            chain: Mutex::new(ChainState {
97                sequence: 0,
98                prev_hash: "genesis".into(),
99            }),
100            file: None,
101        }
102    }
103
104    /// Open the persistent file handle. Call once after construction.
105    /// Using a separate init method because async isn't allowed in `new`.
106    pub async fn init_file(&mut self) -> Result<(), SinkError> {
107        if let Some(ref path) = self.config.file_path {
108            let file = OpenOptions::new()
109                .create(true)
110                .append(true)
111                .open(path)
112                .await?;
113            self.file = Some(Mutex::new(file));
114        }
115        Ok(())
116    }
117
118    /// Get a handle to the audit stats tracker for querying.
119    pub fn stats(&self) -> &crate::stats::AuditStats {
120        &self.stats
121    }
122
123    /// Consecutive successes required before transitioning from degraded to healthy.
124    /// Prevents flapping when the underlying issue is intermittent (e.g., disk pressure).
125    const RECOVERY_THRESHOLD: u64 = 3;
126
127    /// Returns true if the audit sink is degraded.
128    /// Hysteresis: once degraded, requires RECOVERY_THRESHOLD consecutive
129    /// successful writes before returning to healthy.
130    pub fn is_degraded(&self) -> bool {
131        self.write_failures.load(Ordering::Relaxed) > 0
132    }
133
134    /// Number of consecutive write failures (0 = healthy).
135    pub fn consecutive_failures(&self) -> u64 {
136        self.write_failures.load(Ordering::Relaxed)
137    }
138
139    /// Total write failures since startup.
140    pub fn total_failures(&self) -> u64 {
141        self.total_write_failures.load(Ordering::Relaxed)
142    }
143
144    /// Write an audit entry to all configured outputs.
145    ///
146    /// Writes to stdout and file sinks in order. The file sink is considered
147    /// critical -- errors are tracked and returned.
148    ///
149    /// When `hash_chain` is enabled, the chain guard is held across the file
150    /// write so sequence order matches on-disk order. A naive
151    /// top-to-bottom verifier is sufficient; we do not require callers to
152    /// sort by `chain_sequence` first.
153    pub async fn write(&self, entry: &AuditEntry) -> Result<(), SinkError> {
154        // Acquire the chain lock once and hold it across the entire write.
155        // Releasing it before the file write (as an earlier version did)
156        // allowed two concurrent callers to interleave: assigning sequence
157        // numbers in order N, N+1 but writing them to disk in order N+1, N.
158        // That left the cryptographic chain intact but broke top-to-bottom
159        // file-order verification.
160        let mut chain_guard = self.chain.lock().await;
161
162        let mut chained_entry = entry.clone();
163        if self.config.hash_chain {
164            chain_guard.sequence += 1;
165            chained_entry.chain_sequence = Some(chain_guard.sequence);
166            chained_entry.chain_prev_hash = Some(chain_guard.prev_hash.clone());
167            // chain_record_hash is computed over the entry WITH sequence and prev_hash
168            // but WITHOUT the record_hash itself.
169            chained_entry.chain_record_hash = None;
170            let pre_hash_json = serde_json::to_string(&chained_entry).unwrap_or_default();
171            let record_hash = blake3::hash(pre_hash_json.as_bytes()).to_hex().to_string();
172            chained_entry.chain_record_hash = Some(record_hash.clone());
173            chain_guard.prev_hash = record_hash;
174        }
175
176        let json = serde_json::to_string(&chained_entry)?;
177
178        if self.config.write_stdout {
179            // Structured JSON line to stdout via tracing (12-factor).
180            tracing::info!(target: "arbiter_audit", audit_entry = %json);
181        }
182
183        if let Some(path) = &self.config.file_path {
184            match self.write_to_file(path, &json).await {
185                Ok(()) => {
186                    let prev_failures = self.write_failures.load(Ordering::Relaxed);
187                    if prev_failures > 0 {
188                        // In recovery: count consecutive successes before clearing degraded state.
189                        let successes = self.recovery_successes.fetch_add(1, Ordering::Relaxed) + 1;
190                        if successes >= Self::RECOVERY_THRESHOLD {
191                            self.write_failures.store(0, Ordering::Relaxed);
192                            self.recovery_successes.store(0, Ordering::Relaxed);
193                            tracing::info!(
194                                threshold = Self::RECOVERY_THRESHOLD,
195                                "audit sink recovered after {} consecutive successful writes",
196                                successes
197                            );
198                        }
199                    }
200                }
201                Err(e) => {
202                    let consecutive = self.write_failures.fetch_add(1, Ordering::Relaxed) + 1;
203                    self.total_write_failures.fetch_add(1, Ordering::Relaxed);
204                    self.recovery_successes.store(0, Ordering::Relaxed);
205                    tracing::error!(
206                        error = %e,
207                        consecutive_failures = consecutive,
208                        "audit file write failed; audit data may be lost"
209                    );
210                    return Err(e);
211                }
212            }
213        }
214
215        // Update in-memory stats AFTER all writes succeed.
216        // Previously stats were updated before the write, causing counters
217        // to diverge from actual committed entries on write failure.
218        self.stats.record(entry).await;
219
220        Ok(())
221    }
222
223    async fn write_to_file(&self, path: &PathBuf, json: &str) -> Result<(), SinkError> {
224        // Use the persistent file handle if available (opened once at init).
225        // Falls back to per-write open for backward compatibility.
226        if let Some(ref file_mutex) = self.file {
227            let mut file = file_mutex.lock().await;
228            file.write_all(json.as_bytes()).await?;
229            file.write_all(b"\n").await?;
230            file.flush().await?;
231            file.sync_all().await?;
232            return Ok(());
233        }
234
235        // Fallback: open per-write (legacy path).
236        let mut file = OpenOptions::new()
237            .create(true)
238            .append(true)
239            .open(path)
240            .await?;
241        file.write_all(json.as_bytes()).await?;
242        file.write_all(b"\n").await?;
243        file.flush().await?;
244        file.sync_all().await?;
245        Ok(())
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use uuid::Uuid;
253
254    #[tokio::test]
255    async fn write_to_file() {
256        let dir = std::env::temp_dir().join(format!("arbiter-audit-test-{}", Uuid::new_v4()));
257        let file_path = dir.join("audit.jsonl");
258        tokio::fs::create_dir_all(&dir).await.unwrap();
259
260        let sink = AuditSink::new(AuditSinkConfig {
261            write_stdout: false,
262            file_path: Some(file_path.clone()),
263            ..Default::default()
264        });
265
266        let mut entry = AuditEntry::new(Uuid::new_v4());
267        entry.agent_id = "test-agent".into();
268        entry.tool_called = "test_tool".into();
269        entry.latency_ms = 10;
270
271        sink.write(&entry).await.unwrap();
272        sink.write(&entry).await.unwrap();
273
274        let contents = tokio::fs::read_to_string(&file_path).await.unwrap();
275        let lines: Vec<&str> = contents.trim().lines().collect();
276        assert_eq!(lines.len(), 2);
277
278        // Each line should be valid JSON.
279        let parsed: AuditEntry = serde_json::from_str(lines[0]).unwrap();
280        assert_eq!(parsed.agent_id, "test-agent");
281
282        // Cleanup.
283        let _ = tokio::fs::remove_dir_all(&dir).await;
284    }
285
286    #[tokio::test]
287    async fn tracks_write_failures() {
288        // Point at a non-existent directory to force write failures.
289        let sink = AuditSink::new(AuditSinkConfig {
290            write_stdout: false,
291            file_path: Some(PathBuf::from("/nonexistent/dir/audit.jsonl")),
292            ..Default::default()
293        });
294
295        assert!(!sink.is_degraded());
296        assert_eq!(sink.consecutive_failures(), 0);
297
298        let mut entry = AuditEntry::new(Uuid::new_v4());
299        entry.tool_called = "test".into();
300
301        // First write should fail.
302        assert!(sink.write(&entry).await.is_err());
303        assert!(sink.is_degraded());
304        assert_eq!(sink.consecutive_failures(), 1);
305        assert_eq!(sink.total_failures(), 1);
306
307        // Second failure increments.
308        assert!(sink.write(&entry).await.is_err());
309        assert_eq!(sink.consecutive_failures(), 2);
310        assert_eq!(sink.total_failures(), 2);
311    }
312
313    #[tokio::test]
314    async fn resets_failures_on_success() {
315        let dir = std::env::temp_dir().join(format!("arbiter-audit-reset-{}", Uuid::new_v4()));
316        let file_path = dir.join("audit.jsonl");
317
318        // Start with bad path.
319        let sink = AuditSink::new(AuditSinkConfig {
320            write_stdout: false,
321            file_path: Some(PathBuf::from("/nonexistent/dir/audit.jsonl")),
322            ..Default::default()
323        });
324
325        let mut entry = AuditEntry::new(Uuid::new_v4());
326        entry.tool_called = "test".into();
327
328        // Force a failure.
329        let _ = sink.write(&entry).await;
330        assert!(sink.is_degraded());
331
332        // Now create the real dir and point to it (simulate recovery).
333        // Since config is immutable, we test with a new sink to prove the counter logic.
334        tokio::fs::create_dir_all(&dir).await.unwrap();
335        let recovered_sink = AuditSink::new(AuditSinkConfig {
336            write_stdout: false,
337            file_path: Some(file_path.clone()),
338            ..Default::default()
339        });
340        // Manually simulate degraded state then recovery with hysteresis.
341        recovered_sink.write_failures.store(3, Ordering::Relaxed);
342        assert!(recovered_sink.is_degraded());
343
344        // With hysteresis, RECOVERY_THRESHOLD consecutive successes needed.
345        for i in 1..AuditSink::RECOVERY_THRESHOLD {
346            recovered_sink.write(&entry).await.unwrap();
347            assert!(
348                recovered_sink.is_degraded(),
349                "should still be degraded after {i} successful write(s)"
350            );
351        }
352        // The Nth success clears the degraded state.
353        recovered_sink.write(&entry).await.unwrap();
354        assert!(!recovered_sink.is_degraded());
355        assert_eq!(recovered_sink.consecutive_failures(), 0);
356
357        let _ = tokio::fs::remove_dir_all(&dir).await;
358    }
359
360    #[test]
361    fn serialization_produces_valid_json() {
362        let mut entry = AuditEntry::new(Uuid::new_v4());
363        entry.agent_id = "test-agent".into();
364        entry.tool_called = "dangerous_tool".into();
365        entry.authorization_decision = "deny".into();
366        entry.policy_matched = Some("block-dangerous".into());
367        entry.anomaly_flags = vec!["scope_violation".into(), "unusual_hour".into()];
368        entry.latency_ms = 7;
369        entry.upstream_status = Some(403);
370
371        let json = serde_json::to_string(&entry).unwrap();
372
373        // The JSON must round-trip cleanly.
374        let parsed: AuditEntry = serde_json::from_str(&json).unwrap();
375        assert_eq!(parsed.agent_id, "test-agent");
376        assert_eq!(parsed.authorization_decision, "deny");
377        assert_eq!(parsed.anomaly_flags.len(), 2);
378        assert_eq!(parsed.upstream_status, Some(403));
379
380        // The JSON must be a single line (suitable for JSONL).
381        assert!(!json.contains('\n'), "JSON must be a single line");
382    }
383}