use std::io;
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use aa_security::CredentialFinding;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ProxyAuditDecision {
Forwarded,
ForwardedRedacted,
Blocked,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyAuditEntry {
pub ts_ms: i64,
pub agent_id: Option<String>,
pub host: String,
pub method: String,
pub path: String,
pub decision: ProxyAuditDecision,
pub credential_findings: Vec<CredentialFinding>,
pub redacted_body: Option<String>,
}
pub struct JsonlWriter {
receiver: mpsc::Receiver<ProxyAuditEntry>,
file: tokio::io::BufWriter<tokio::fs::File>,
path: PathBuf,
}
impl JsonlWriter {
pub async fn new(path: &Path, receiver: mpsc::Receiver<ProxyAuditEntry>) -> io::Result<Self> {
let file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
Ok(Self {
receiver,
file: tokio::io::BufWriter::new(file),
path: path.to_path_buf(),
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub async fn run(mut self) {
tracing::info!(path = %self.path.display(), "proxy audit jsonl writer started");
while let Some(entry) = self.receiver.recv().await {
if let Err(e) = self.append(&entry).await {
tracing::error!(error = %e, "proxy audit jsonl write failed");
}
}
if let Err(e) = self.file.flush().await {
tracing::error!(error = %e, "proxy audit jsonl final flush failed");
}
tracing::info!(path = %self.path.display(), "proxy audit jsonl writer stopped");
}
async fn append(&mut self, entry: &ProxyAuditEntry) -> io::Result<()> {
let json = serde_json::to_string(entry).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
self.file.write_all(json.as_bytes()).await?;
self.file.write_all(b"\n").await?;
self.file.flush().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
const FAKE_AWS_ACCESS_KEY: &str = "AKIAIOSFODNN7EXAMPLE";
#[tokio::test]
async fn audit_writer_never_writes_raw_secret() {
use aa_security::CredentialScanner;
let body = format!(r#"{{"k":"{FAKE_AWS_ACCESS_KEY}"}}"#);
let scan = CredentialScanner::new().scan(&body);
assert!(
!scan.findings.is_empty(),
"scanner fixture invariant — AWS key must be detected"
);
let redacted = scan.redact(&body);
let entry = ProxyAuditEntry {
ts_ms: 1_700_000_000_000,
agent_id: Some("agent-1".into()),
host: "api.openai.com".into(),
method: "POST".into(),
path: "/v1/chat/completions".into(),
decision: ProxyAuditDecision::ForwardedRedacted,
credential_findings: scan.findings,
redacted_body: Some(redacted),
};
let tmp = tempfile::tempdir().expect("create tempdir");
let path = tmp.path().join("proxy-audit.jsonl");
let (tx, rx) = mpsc::channel(4);
let writer = JsonlWriter::new(&path, rx).await.expect("open jsonl writer");
let handle = tokio::spawn(writer.run());
tx.send(entry).await.expect("send entry");
drop(tx);
handle.await.expect("writer task joins cleanly");
let on_disk = tokio::fs::read_to_string(&path).await.expect("read JSONL");
assert!(
!on_disk.contains(FAKE_AWS_ACCESS_KEY),
"SECURITY INVARIANT VIOLATED: raw secret present in proxy audit JSONL: {on_disk}",
);
assert!(
on_disk.contains("[REDACTED:AwsAccessKey]"),
"JSONL must carry the [REDACTED:AwsAccessKey] marker, got: {on_disk}",
);
assert_eq!(
on_disk.matches('\n').count(),
1,
"single entry must produce exactly one trailing newline: {on_disk}",
);
}
fn clean_entry(host: &str, decision: ProxyAuditDecision) -> ProxyAuditEntry {
ProxyAuditEntry {
ts_ms: 1_700_000_000_000,
agent_id: None,
host: host.into(),
method: "GET".into(),
path: "/".into(),
decision,
credential_findings: vec![],
redacted_body: None,
}
}
#[tokio::test]
async fn writer_appends_one_jsonl_line_per_entry() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("audit.jsonl");
let (tx, rx) = mpsc::channel(8);
let writer = JsonlWriter::new(&path, rx).await.unwrap();
let handle = tokio::spawn(writer.run());
for host in ["a.example", "b.example", "c.example"] {
tx.send(clean_entry(host, ProxyAuditDecision::Forwarded)).await.unwrap();
}
drop(tx);
handle.await.unwrap();
let on_disk = tokio::fs::read_to_string(&path).await.unwrap();
let lines: Vec<&str> = on_disk.lines().collect();
assert_eq!(lines.len(), 3, "three entries → three lines");
for line in &lines {
serde_json::from_str::<ProxyAuditEntry>(line).expect("each line is a valid entry");
}
assert!(on_disk.contains("a.example") && on_disk.contains("c.example"));
}
#[tokio::test]
async fn writer_appends_to_existing_file_across_two_runs() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("audit.jsonl");
{
let (tx, rx) = mpsc::channel(4);
let writer = JsonlWriter::new(&path, rx).await.unwrap();
let handle = tokio::spawn(writer.run());
tx.send(clean_entry("first.example", ProxyAuditDecision::Blocked))
.await
.unwrap();
drop(tx);
handle.await.unwrap();
}
{
let (tx, rx) = mpsc::channel(4);
let writer = JsonlWriter::new(&path, rx).await.unwrap();
let handle = tokio::spawn(writer.run());
tx.send(clean_entry("second.example", ProxyAuditDecision::Forwarded))
.await
.unwrap();
drop(tx);
handle.await.unwrap();
}
let on_disk = tokio::fs::read_to_string(&path).await.unwrap();
assert_eq!(on_disk.lines().count(), 2, "append mode preserves prior content");
assert!(on_disk.contains("first.example"));
assert!(on_disk.contains("second.example"));
}
#[tokio::test]
async fn writer_with_no_entries_produces_empty_file() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("audit.jsonl");
let (tx, rx) = mpsc::channel(1);
let writer = JsonlWriter::new(&path, rx).await.unwrap();
let handle = tokio::spawn(writer.run());
drop(tx);
handle.await.unwrap();
let on_disk = tokio::fs::read_to_string(&path).await.unwrap();
assert!(on_disk.is_empty(), "no entries → empty file");
}
#[tokio::test]
async fn writer_exposes_its_path() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("audit.jsonl");
let (_tx, rx) = mpsc::channel(1);
let writer = JsonlWriter::new(&path, rx).await.unwrap();
assert_eq!(writer.path(), path.as_path());
}
#[tokio::test]
async fn writer_new_errors_when_parent_dir_missing() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("does/not/exist/audit.jsonl");
let (_tx, rx) = mpsc::channel(1);
match JsonlWriter::new(&path, rx).await {
Ok(_) => panic!("opening under a missing parent dir must fail"),
Err(e) => assert_eq!(e.kind(), io::ErrorKind::NotFound),
}
}
#[test]
fn decision_serializes_to_snake_case() {
let cases = [
(ProxyAuditDecision::Forwarded, "\"forwarded\""),
(ProxyAuditDecision::ForwardedRedacted, "\"forwarded_redacted\""),
(ProxyAuditDecision::Blocked, "\"blocked\""),
];
for (decision, expected) in cases {
assert_eq!(serde_json::to_string(&decision).unwrap(), expected);
let back: ProxyAuditDecision = serde_json::from_str(expected).unwrap();
assert_eq!(back, decision);
}
}
#[test]
fn entry_round_trips_through_json_preserving_fields() {
let entry = ProxyAuditEntry {
ts_ms: 42,
agent_id: Some("agent-x".into()),
host: "api.example".into(),
method: "POST".into(),
path: "/v1/do".into(),
decision: ProxyAuditDecision::ForwardedRedacted,
credential_findings: vec![],
redacted_body: Some("clean body".into()),
};
let json = serde_json::to_string(&entry).unwrap();
let back: ProxyAuditEntry = serde_json::from_str(&json).unwrap();
assert_eq!(back.ts_ms, 42);
assert_eq!(back.agent_id.as_deref(), Some("agent-x"));
assert_eq!(back.host, "api.example");
assert_eq!(back.decision, ProxyAuditDecision::ForwardedRedacted);
assert_eq!(back.redacted_body.as_deref(), Some("clean body"));
}
}