Skip to main content

agent_engine/extensions/
audit.rs

1//! Append-only provider invocation audit log.
2//!
3//! Records minimal metadata for each provider routing event without
4//! storing prompts or tool payloads. File: `$SYNAPS_BASE_DIR/extensions/audit.jsonl`.
5//!
6//! Design constraints:
7//!
8//! - One JSON object per line; missing file is equivalent to no entries.
9//! - Append-only: each new entry uses `O_APPEND` so concurrent appenders
10//!   on Unix produce well-formed line records without locking.
11//! - Malformed lines (e.g. partial write from a crash) are skipped on read
12//!   with a `tracing::warn!` so a corrupt line cannot lock the user out.
13//! - Never contains prompt text, tool inputs, tool outputs, or tokens.
14
15use std::fs::OpenOptions;
16use std::io::Write;
17use std::path::{Path, PathBuf};
18
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
20pub struct ProviderAuditEntry {
21    /// RFC3339 UTC timestamp.
22    pub timestamp: String,
23    pub plugin_id: String,
24    pub provider_id: String,
25    pub model_id: String,
26    /// Whether Synaps exposed any tool schemas to this invocation.
27    pub tools_exposed: bool,
28    /// Number of tool calls the provider requested during this invocation.
29    /// 0 if no tool-use loop ran.
30    #[serde(default)]
31    pub tools_requested: u32,
32    /// Whether the invocation streamed (vs. provider.complete).
33    #[serde(default)]
34    pub streamed: bool,
35    /// "ok" | "error" | "blocked" — high-level outcome.
36    pub outcome: String,
37    /// Optional short error class (e.g. "trust_disabled", "ipc_error", "timeout").
38    /// MUST NOT contain prompt or tool content.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub error_class: Option<String>,
41}
42
43/// Path to the audit file under the active base dir. Caller is responsible for
44/// creating parent directories when writing.
45pub fn audit_file_path() -> PathBuf {
46    audit_file_path_for(&crate::config::base_dir())
47}
48
49/// Path to the audit file rooted at an explicit base dir (test helper / reuse).
50pub(crate) fn audit_file_path_for(base: &Path) -> PathBuf {
51    base.join("extensions").join("audit.jsonl")
52}
53
54/// Build a fresh entry with the current UTC timestamp.
55#[allow(clippy::too_many_arguments)]
56pub fn new_audit_entry(
57    plugin_id: impl Into<String>,
58    provider_id: impl Into<String>,
59    model_id: impl Into<String>,
60    tools_exposed: bool,
61    tools_requested: u32,
62    streamed: bool,
63    outcome: impl Into<String>,
64    error_class: Option<String>,
65) -> ProviderAuditEntry {
66    ProviderAuditEntry {
67        timestamp: chrono::Utc::now().to_rfc3339(),
68        plugin_id: plugin_id.into(),
69        provider_id: provider_id.into(),
70        model_id: model_id.into(),
71        tools_exposed,
72        tools_requested,
73        streamed,
74        outcome: outcome.into(),
75        error_class,
76    }
77}
78
79/// Append a single entry as one JSON line. Creates parent dirs and the file
80/// if missing. Atomic per-line via `O_APPEND` on Unix.
81pub fn append_audit_entry(entry: &ProviderAuditEntry) -> Result<(), String> {
82    append_audit_entry_to(&crate::config::base_dir(), entry)
83}
84
85/// Append an entry under an explicit base dir.
86pub(crate) fn append_audit_entry_to(
87    base: &Path,
88    entry: &ProviderAuditEntry,
89) -> Result<(), String> {
90    let path = audit_file_path_for(base);
91    let parent = path
92        .parent()
93        .ok_or_else(|| format!("audit.jsonl path has no parent: {}", path.display()))?;
94    std::fs::create_dir_all(parent)
95        .map_err(|e| format!("failed to create dir {}: {}", parent.display(), e))?;
96
97    let mut line = serde_json::to_string(entry)
98        .map_err(|e| format!("failed to serialize audit entry: {}", e))?;
99    line.push('\n');
100
101    let mut file = OpenOptions::new()
102        .create(true)
103        .append(true)
104        .open(&path)
105        .map_err(|e| format!("failed to open {}: {}", path.display(), e))?;
106    file.write_all(line.as_bytes())
107        .map_err(|e| format!("failed to append to {}: {}", path.display(), e))?;
108    Ok(())
109}
110
111/// Read all entries (one per line). Missing file → empty Vec. Malformed
112/// lines are skipped with a `tracing::warn!`.
113pub fn read_audit_entries() -> Result<Vec<ProviderAuditEntry>, String> {
114    read_audit_entries_from(&crate::config::base_dir())
115}
116
117/// Read entries under an explicit base dir.
118pub(crate) fn read_audit_entries_from(
119    base: &Path,
120) -> Result<Vec<ProviderAuditEntry>, String> {
121    let path = audit_file_path_for(base);
122    let contents = match std::fs::read_to_string(&path) {
123        Ok(s) => s,
124        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
125        Err(e) => {
126            return Err(format!(
127                "failed to read audit.jsonl at {}: {}",
128                path.display(),
129                e
130            ));
131        }
132    };
133    let mut entries = Vec::new();
134    for (idx, raw) in contents.lines().enumerate() {
135        let line = raw.trim();
136        if line.is_empty() {
137            continue;
138        }
139        match serde_json::from_str::<ProviderAuditEntry>(line) {
140            Ok(entry) => entries.push(entry),
141            Err(e) => {
142                tracing::warn!(
143                    target: "synaps::extensions::audit",
144                    "skipping malformed audit.jsonl line {} at {}: {}",
145                    idx + 1,
146                    path.display(),
147                    e
148                );
149            }
150        }
151    }
152    Ok(entries)
153}
154
155/// Read only the last `n` entries from the audit log.  Reads the whole file
156/// but collects lines into a fixed-capacity ring so only the tail N lines are
157/// ever parsed by serde — O(file_size) bytes read but O(N) deserialisation.
158/// Missing file → empty Vec.  Malformed tail lines are skipped with a warn.
159pub fn read_audit_entries_tail(n: usize) -> Result<Vec<ProviderAuditEntry>, String> {
160    read_audit_entries_tail_from(&crate::config::base_dir(), n)
161}
162
163/// `read_audit_entries_tail` under an explicit base dir (test helper).
164pub(crate) fn read_audit_entries_tail_from(
165    base: &Path,
166    n: usize,
167) -> Result<Vec<ProviderAuditEntry>, String> {
168    if n == 0 {
169        return Ok(Vec::new());
170    }
171    let path = audit_file_path_for(base);
172    let contents = match std::fs::read_to_string(&path) {
173        Ok(s) => s,
174        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
175        Err(e) => {
176            return Err(format!(
177                "failed to read audit.jsonl at {}: {}",
178                path.display(),
179                e
180            ));
181        }
182    };
183    // Collect the last N non-empty lines without parsing them yet — avoids
184    // deserialising the entire history when only the tail is needed.
185    let tail_lines: Vec<&str> = contents
186        .lines()
187        .filter(|l| !l.trim().is_empty())
188        .collect::<Vec<_>>()
189        .into_iter()
190        .rev()
191        .take(n)
192        .collect::<Vec<_>>()
193        .into_iter()
194        .rev()
195        .collect();
196
197    let mut entries = Vec::with_capacity(tail_lines.len());
198    for raw in tail_lines {
199        let line = raw.trim();
200        match serde_json::from_str::<ProviderAuditEntry>(line) {
201            Ok(entry) => entries.push(entry),
202            Err(e) => {
203                tracing::warn!(
204                    target: "synaps::extensions::audit",
205                    "skipping malformed audit.jsonl tail line at {}: {}",
206                    path.display(),
207                    e
208                );
209            }
210        }
211    }
212    Ok(entries)
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use tempfile::TempDir;
219
220    fn sample(plugin: &str, outcome: &str) -> ProviderAuditEntry {
221        ProviderAuditEntry {
222            timestamp: "2025-01-01T00:00:00Z".to_string(),
223            plugin_id: plugin.to_string(),
224            provider_id: "p".to_string(),
225            model_id: "m".to_string(),
226            tools_exposed: false,
227            tools_requested: 0,
228            streamed: false,
229            outcome: outcome.to_string(),
230            error_class: None,
231        }
232    }
233
234    #[test]
235    fn audit_file_path_is_under_extensions_dir() {
236        let dir = TempDir::new().unwrap();
237        let p = audit_file_path_for(dir.path());
238        assert_eq!(p, dir.path().join("extensions").join("audit.jsonl"));
239    }
240
241    #[test]
242    fn append_two_entries_then_read_returns_them_in_order() {
243        let dir = TempDir::new().unwrap();
244        let a = sample("plug-a", "ok");
245        let b = sample("plug-b", "blocked");
246        append_audit_entry_to(dir.path(), &a).unwrap();
247        append_audit_entry_to(dir.path(), &b).unwrap();
248        let entries = read_audit_entries_from(dir.path()).unwrap();
249        assert_eq!(entries, vec![a, b]);
250    }
251
252    #[test]
253    fn read_missing_file_returns_empty() {
254        let dir = TempDir::new().unwrap();
255        let entries = read_audit_entries_from(dir.path()).unwrap();
256        assert!(entries.is_empty());
257    }
258
259    #[test]
260    fn malformed_line_in_middle_is_skipped() {
261        let dir = TempDir::new().unwrap();
262        let a = sample("plug-a", "ok");
263        let c = sample("plug-c", "error");
264        append_audit_entry_to(dir.path(), &a).unwrap();
265        // Inject a malformed line.
266        let path = audit_file_path_for(dir.path());
267        let mut f = OpenOptions::new().append(true).open(&path).unwrap();
268        f.write_all(b"{ this is not valid json\n").unwrap();
269        drop(f);
270        append_audit_entry_to(dir.path(), &c).unwrap();
271
272        let entries = read_audit_entries_from(dir.path()).unwrap();
273        assert_eq!(entries, vec![a, c]);
274    }
275
276    #[test]
277    fn concurrent_appenders_produce_full_record_count() {
278        let dir = TempDir::new().unwrap();
279        let base = dir.path().to_path_buf();
280        let mut handles = Vec::new();
281        for t in 0..4u32 {
282            let base = base.clone();
283            handles.push(std::thread::spawn(move || {
284                for i in 0..10u32 {
285                    let mut e = sample(&format!("plug-{t}"), "ok");
286                    e.tools_requested = i;
287                    append_audit_entry_to(&base, &e).expect("append");
288                }
289            }));
290        }
291        for h in handles {
292            h.join().unwrap();
293        }
294        let entries = read_audit_entries_from(&base).unwrap();
295        assert_eq!(entries.len(), 40);
296    }
297
298    #[test]
299    fn new_audit_entry_produces_rfc3339_timestamp() {
300        let e = new_audit_entry(
301            "plug",
302            "prov",
303            "model",
304            true,
305            0,
306            false,
307            "ok",
308            None,
309        );
310        // Year-4-digits + 'T' separator + tz suffix ('Z' or '+'/'-' offset).
311        let ts = &e.timestamp;
312        assert!(ts.len() >= 20, "timestamp too short: {ts}");
313        assert!(
314            ts.chars().take(4).all(|c| c.is_ascii_digit()),
315            "expected 4-digit year: {ts}"
316        );
317        assert!(ts.contains('T'), "expected 'T' separator: {ts}");
318        assert!(
319            ts.ends_with('Z') || ts.contains('+') || ts[10..].contains('-'),
320            "expected timezone suffix: {ts}"
321        );
322        // chrono should be able to round-trip its own RFC3339 output.
323        chrono::DateTime::parse_from_rfc3339(ts)
324            .unwrap_or_else(|err| panic!("parse_from_rfc3339({ts}) failed: {err}"));
325    }
326
327    #[test]
328    fn round_trip_with_error_class_omitted_when_none() {
329        let dir = TempDir::new().unwrap();
330        let mut e = sample("plug", "ok");
331        e.error_class = None;
332        append_audit_entry_to(dir.path(), &e).unwrap();
333        let raw = std::fs::read_to_string(audit_file_path_for(dir.path())).unwrap();
334        assert!(
335            !raw.contains("error_class"),
336            "error_class should be skipped when None: {raw}"
337        );
338        let loaded = read_audit_entries_from(dir.path()).unwrap();
339        assert_eq!(loaded, vec![e]);
340    }
341}