Skip to main content

zeph_tools/
audit.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Structured JSONL audit logging for tool invocations.
5//!
6//! Every tool execution produces an [`AuditEntry`] that is serialized as a newline-delimited
7//! JSON record and written to the configured destination (stdout or a file).
8//!
9//! # Configuration
10//!
11//! Audit logging is controlled by [`AuditConfig`]. When
12//! `destination` is `"stdout"`, entries are emitted via `tracing::info!(target: "audit", ...)`.
13//! Any other value is treated as a file path opened in append mode.
14//!
15//! # Security note
16//!
17//! Audit entries intentionally omit the raw cosine distance from anomaly detection
18//! (`embedding_anomalous` is a boolean flag) to prevent threshold reverse-engineering.
19
20use std::path::Path;
21
22use zeph_common::ToolName;
23
24use crate::config::AuditConfig;
25
26/// Async writer that appends [`AuditEntry`] records to a structured JSONL log.
27///
28/// Create via [`AuditLogger::from_config`] and share behind an `Arc`. Each executor
29/// that should emit audit records accepts the logger via a builder method
30/// (e.g. [`ShellExecutor::with_audit`](crate::ShellExecutor::with_audit)).
31///
32/// # Thread safety
33///
34/// File writes are serialized through an internal `tokio::sync::Mutex<File>`.
35/// Multiple concurrent log calls are safe but may block briefly on the mutex.
36#[derive(Debug)]
37pub struct AuditLogger {
38    destination: AuditDestination,
39}
40
41#[derive(Debug)]
42enum AuditDestination {
43    Stdout,
44    File(tokio::sync::Mutex<tokio::fs::File>),
45}
46
47/// A single tool invocation record written to the audit log.
48///
49/// Serialized as a flat JSON object (newline-terminated). Optional fields are omitted
50/// when `None` or `false` to keep entries compact.
51///
52/// # Example JSON output
53///
54/// ```json
55/// {"timestamp":"1712345678","tool":"shell","command":"ls -la","result":{"type":"success"},
56///  "duration_ms":12,"exit_code":0,"claim_source":"shell"}
57/// ```
58#[derive(serde::Serialize)]
59#[allow(clippy::struct_excessive_bools)]
60pub struct AuditEntry {
61    /// Unix timestamp (seconds) when the tool invocation started.
62    pub timestamp: String,
63    /// Tool identifier (e.g. `"shell"`, `"web_scrape"`, `"fetch"`).
64    pub tool: ToolName,
65    /// Human-readable command or URL being invoked.
66    pub command: String,
67    /// Outcome of the invocation.
68    pub result: AuditResult,
69    /// Wall-clock duration from invocation start to completion, in milliseconds.
70    pub duration_ms: u64,
71    /// Fine-grained error category label from the taxonomy. `None` for successful executions.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub error_category: Option<String>,
74    /// High-level error domain for recovery dispatch. `None` for successful executions.
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub error_domain: Option<String>,
77    /// Invocation phase in which the error occurred per arXiv:2601.16280 taxonomy.
78    /// `None` for successful executions.
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub error_phase: Option<String>,
81    /// Provenance of the tool result. `None` for non-executor audit entries (e.g. policy checks).
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub claim_source: Option<crate::executor::ClaimSource>,
84    /// MCP server ID for tool calls routed through `McpToolExecutor`. `None` for native tools.
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub mcp_server_id: Option<String>,
87    /// Tool output was flagged by regex injection detection.
88    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
89    pub injection_flagged: bool,
90    /// Tool output was flagged as anomalous by the embedding guard.
91    /// Raw cosine distance is NOT stored (prevents threshold reverse-engineering).
92    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
93    pub embedding_anomalous: bool,
94    /// Tool result crossed the MCP-to-ACP trust boundary (MCP tool result served to an ACP client).
95    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
96    pub cross_boundary_mcp_to_acp: bool,
97    /// Decision recorded by the adversarial policy agent before execution.
98    ///
99    /// Values: `"allow"`, `"deny:<reason>"`, `"error:<message>"`.
100    /// `None` when adversarial policy is disabled or not applicable.
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub adversarial_policy_decision: Option<String>,
103    /// Process exit code for shell tool executions. `None` for non-shell tools.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub exit_code: Option<i32>,
106    /// Whether tool output was truncated before storage. Default false.
107    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
108    pub truncated: bool,
109    /// Caller identity that initiated this tool call. `None` for system calls.
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub caller_id: Option<String>,
112    /// Policy rule trace that matched this tool call. Populated from `PolicyDecision::trace`.
113    /// `None` when policy is disabled or this entry is not from a policy check.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub policy_match: Option<String>,
116}
117
118/// Outcome of a tool invocation, serialized as a tagged JSON object.
119///
120/// The `type` field selects the variant; additional fields are present only for the
121/// relevant variants.
122///
123/// # Serialization
124///
125/// ```json
126/// {"type":"success"}
127/// {"type":"blocked","reason":"sudo"}
128/// {"type":"error","message":"exec failed"}
129/// {"type":"timeout"}
130/// {"type":"rollback","restored":3,"deleted":1}
131/// ```
132#[derive(serde::Serialize)]
133#[serde(tag = "type")]
134pub enum AuditResult {
135    /// The tool executed successfully.
136    #[serde(rename = "success")]
137    Success,
138    /// The tool invocation was blocked by policy before execution.
139    #[serde(rename = "blocked")]
140    Blocked {
141        /// The matched blocklist pattern or policy rule that triggered the block.
142        reason: String,
143    },
144    /// The tool attempted execution but failed with an error.
145    #[serde(rename = "error")]
146    Error {
147        /// Human-readable error description.
148        message: String,
149    },
150    /// The tool exceeded its configured timeout.
151    #[serde(rename = "timeout")]
152    Timeout,
153    /// A transactional rollback was performed after a failed execution.
154    #[serde(rename = "rollback")]
155    Rollback {
156        /// Number of files restored to their pre-execution snapshot.
157        restored: usize,
158        /// Number of newly-created files that were deleted during rollback.
159        deleted: usize,
160    },
161}
162
163impl AuditLogger {
164    /// Create a new `AuditLogger` from config.
165    ///
166    /// When `tui_mode` is `true` and `config.destination` is `"stdout"`, the
167    /// destination is redirected to a file (`audit.jsonl` in the current directory)
168    /// to avoid corrupting the TUI output with raw JSON lines.
169    ///
170    /// # Errors
171    ///
172    /// Returns an error if a file destination cannot be opened.
173    pub async fn from_config(config: &AuditConfig, tui_mode: bool) -> Result<Self, std::io::Error> {
174        let effective_dest = if tui_mode && config.destination == "stdout" {
175            tracing::warn!("TUI mode: audit stdout redirected to file audit.jsonl");
176            "audit.jsonl".to_owned()
177        } else {
178            config.destination.clone()
179        };
180
181        let destination = if effective_dest == "stdout" {
182            AuditDestination::Stdout
183        } else {
184            let file = tokio::fs::OpenOptions::new()
185                .create(true)
186                .append(true)
187                .open(Path::new(&effective_dest))
188                .await?;
189            AuditDestination::File(tokio::sync::Mutex::new(file))
190        };
191
192        Ok(Self { destination })
193    }
194
195    /// Serialize `entry` to JSON and append it to the configured destination.
196    ///
197    /// Serialization errors are logged via `tracing::error!` and silently swallowed so
198    /// that audit failures never interrupt tool execution.
199    pub async fn log(&self, entry: &AuditEntry) {
200        let json = match serde_json::to_string(entry) {
201            Ok(j) => j,
202            Err(err) => {
203                tracing::error!("audit entry serialization failed: {err}");
204                return;
205            }
206        };
207
208        match &self.destination {
209            AuditDestination::Stdout => {
210                tracing::info!(target: "audit", "{json}");
211            }
212            AuditDestination::File(file) => {
213                use tokio::io::AsyncWriteExt;
214                let mut f = file.lock().await;
215                let line = format!("{json}\n");
216                if let Err(e) = f.write_all(line.as_bytes()).await {
217                    tracing::error!("failed to write audit log: {e}");
218                } else if let Err(e) = f.flush().await {
219                    tracing::error!("failed to flush audit log: {e}");
220                }
221            }
222        }
223    }
224}
225
226/// Log a per-tool risk summary at startup when `audit.tool_risk_summary = true`.
227///
228/// Each entry records tool name, privilege level (static mapping by tool id), and the
229/// expected input sanitization method. This is a design-time inventory label —
230/// NOT a runtime guarantee that sanitization is functioning correctly.
231pub fn log_tool_risk_summary(tool_ids: &[&str]) {
232    // Static privilege mapping: tool id prefix → (privilege level, expected sanitization).
233    // "high" = can execute arbitrary OS commands; "medium" = network/filesystem access;
234    // "low" = schema-validated parameters only.
235    fn classify(id: &str) -> (&'static str, &'static str) {
236        if id.starts_with("shell") || id == "bash" || id == "exec" {
237            ("high", "env_blocklist + command_blocklist")
238        } else if id.starts_with("web_scrape") || id == "fetch" || id.starts_with("scrape") {
239            ("medium", "validate_url + SSRF + domain_policy")
240        } else if id.starts_with("file_write")
241            || id.starts_with("file_read")
242            || id.starts_with("file")
243        {
244            ("medium", "path_sandbox")
245        } else {
246            ("low", "schema_only")
247        }
248    }
249
250    for &id in tool_ids {
251        let (privilege, sanitization) = classify(id);
252        tracing::info!(
253            tool = id,
254            privilege_level = privilege,
255            expected_sanitization = sanitization,
256            "tool risk summary"
257        );
258    }
259}
260
261/// Returns the current Unix timestamp as a decimal string.
262///
263/// Used to populate [`AuditEntry::timestamp`]. Returns `"0"` if the system clock
264/// is before the Unix epoch (which should never happen in practice).
265#[must_use]
266pub fn chrono_now() -> String {
267    use std::time::{SystemTime, UNIX_EPOCH};
268    let secs = SystemTime::now()
269        .duration_since(UNIX_EPOCH)
270        .unwrap_or_default()
271        .as_secs();
272    format!("{secs}")
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn audit_entry_serialization() {
281        let entry = AuditEntry {
282            timestamp: "1234567890".into(),
283            tool: "shell".into(),
284            command: "echo hello".into(),
285            result: AuditResult::Success,
286            duration_ms: 42,
287            error_category: None,
288            error_domain: None,
289            error_phase: None,
290            claim_source: None,
291            mcp_server_id: None,
292            injection_flagged: false,
293            embedding_anomalous: false,
294            cross_boundary_mcp_to_acp: false,
295            adversarial_policy_decision: None,
296            exit_code: None,
297            truncated: false,
298            policy_match: None,
299            caller_id: None,
300        };
301        let json = serde_json::to_string(&entry).unwrap();
302        assert!(json.contains("\"type\":\"success\""));
303        assert!(json.contains("\"tool\":\"shell\""));
304        assert!(json.contains("\"duration_ms\":42"));
305    }
306
307    #[test]
308    fn audit_result_blocked_serialization() {
309        let entry = AuditEntry {
310            timestamp: "0".into(),
311            tool: "shell".into(),
312            command: "sudo rm".into(),
313            result: AuditResult::Blocked {
314                reason: "blocked command: sudo".into(),
315            },
316            duration_ms: 0,
317            error_category: Some("policy_blocked".to_owned()),
318            error_domain: Some("action".to_owned()),
319            error_phase: None,
320            claim_source: None,
321            mcp_server_id: None,
322            injection_flagged: false,
323            embedding_anomalous: false,
324            cross_boundary_mcp_to_acp: false,
325            adversarial_policy_decision: None,
326            exit_code: None,
327            truncated: false,
328            policy_match: None,
329            caller_id: None,
330        };
331        let json = serde_json::to_string(&entry).unwrap();
332        assert!(json.contains("\"type\":\"blocked\""));
333        assert!(json.contains("\"reason\""));
334    }
335
336    #[test]
337    fn audit_result_error_serialization() {
338        let entry = AuditEntry {
339            timestamp: "0".into(),
340            tool: "shell".into(),
341            command: "bad".into(),
342            result: AuditResult::Error {
343                message: "exec failed".into(),
344            },
345            duration_ms: 0,
346            error_category: None,
347            error_domain: None,
348            error_phase: None,
349            claim_source: None,
350            mcp_server_id: None,
351            injection_flagged: false,
352            embedding_anomalous: false,
353            cross_boundary_mcp_to_acp: false,
354            adversarial_policy_decision: None,
355            exit_code: None,
356            truncated: false,
357            policy_match: None,
358            caller_id: None,
359        };
360        let json = serde_json::to_string(&entry).unwrap();
361        assert!(json.contains("\"type\":\"error\""));
362    }
363
364    #[test]
365    fn audit_result_timeout_serialization() {
366        let entry = AuditEntry {
367            timestamp: "0".into(),
368            tool: "shell".into(),
369            command: "sleep 999".into(),
370            result: AuditResult::Timeout,
371            duration_ms: 30000,
372            error_category: Some("timeout".to_owned()),
373            error_domain: Some("system".to_owned()),
374            error_phase: None,
375            claim_source: None,
376            mcp_server_id: None,
377            injection_flagged: false,
378            embedding_anomalous: false,
379            cross_boundary_mcp_to_acp: false,
380            adversarial_policy_decision: None,
381            exit_code: None,
382            truncated: false,
383            policy_match: None,
384            caller_id: None,
385        };
386        let json = serde_json::to_string(&entry).unwrap();
387        assert!(json.contains("\"type\":\"timeout\""));
388    }
389
390    #[tokio::test]
391    async fn audit_logger_stdout() {
392        let config = AuditConfig {
393            enabled: true,
394            destination: "stdout".into(),
395            ..Default::default()
396        };
397        let logger = AuditLogger::from_config(&config, false).await.unwrap();
398        let entry = AuditEntry {
399            timestamp: "0".into(),
400            tool: "shell".into(),
401            command: "echo test".into(),
402            result: AuditResult::Success,
403            duration_ms: 1,
404            error_category: None,
405            error_domain: None,
406            error_phase: None,
407            claim_source: None,
408            mcp_server_id: None,
409            injection_flagged: false,
410            embedding_anomalous: false,
411            cross_boundary_mcp_to_acp: false,
412            adversarial_policy_decision: None,
413            exit_code: None,
414            truncated: false,
415            policy_match: None,
416            caller_id: None,
417        };
418        logger.log(&entry).await;
419    }
420
421    #[tokio::test]
422    async fn audit_logger_file() {
423        let dir = tempfile::tempdir().unwrap();
424        let path = dir.path().join("audit.log");
425        let config = AuditConfig {
426            enabled: true,
427            destination: path.display().to_string(),
428            ..Default::default()
429        };
430        let logger = AuditLogger::from_config(&config, false).await.unwrap();
431        let entry = AuditEntry {
432            timestamp: "0".into(),
433            tool: "shell".into(),
434            command: "echo test".into(),
435            result: AuditResult::Success,
436            duration_ms: 1,
437            error_category: None,
438            error_domain: None,
439            error_phase: None,
440            claim_source: None,
441            mcp_server_id: None,
442            injection_flagged: false,
443            embedding_anomalous: false,
444            cross_boundary_mcp_to_acp: false,
445            adversarial_policy_decision: None,
446            exit_code: None,
447            truncated: false,
448            policy_match: None,
449            caller_id: None,
450        };
451        logger.log(&entry).await;
452
453        let content = tokio::fs::read_to_string(&path).await.unwrap();
454        assert!(content.contains("\"tool\":\"shell\""));
455    }
456
457    #[tokio::test]
458    async fn audit_logger_file_write_error_logged() {
459        let config = AuditConfig {
460            enabled: true,
461            destination: "/nonexistent/dir/audit.log".into(),
462            ..Default::default()
463        };
464        let result = AuditLogger::from_config(&config, false).await;
465        assert!(result.is_err());
466    }
467
468    #[test]
469    fn claim_source_serde_roundtrip() {
470        use crate::executor::ClaimSource;
471        let cases = [
472            (ClaimSource::Shell, "\"shell\""),
473            (ClaimSource::FileSystem, "\"file_system\""),
474            (ClaimSource::WebScrape, "\"web_scrape\""),
475            (ClaimSource::Mcp, "\"mcp\""),
476            (ClaimSource::A2a, "\"a2a\""),
477            (ClaimSource::CodeSearch, "\"code_search\""),
478            (ClaimSource::Diagnostics, "\"diagnostics\""),
479            (ClaimSource::Memory, "\"memory\""),
480        ];
481        for (variant, expected_json) in cases {
482            let serialized = serde_json::to_string(&variant).unwrap();
483            assert_eq!(serialized, expected_json, "serialize {variant:?}");
484            let deserialized: ClaimSource = serde_json::from_str(&serialized).unwrap();
485            assert_eq!(deserialized, variant, "deserialize {variant:?}");
486        }
487    }
488
489    #[test]
490    fn audit_entry_claim_source_none_omitted() {
491        let entry = AuditEntry {
492            timestamp: "0".into(),
493            tool: "shell".into(),
494            command: "echo".into(),
495            result: AuditResult::Success,
496            duration_ms: 1,
497            error_category: None,
498            error_domain: None,
499            error_phase: None,
500            claim_source: None,
501            mcp_server_id: None,
502            injection_flagged: false,
503            embedding_anomalous: false,
504            cross_boundary_mcp_to_acp: false,
505            adversarial_policy_decision: None,
506            exit_code: None,
507            truncated: false,
508            policy_match: None,
509            caller_id: None,
510        };
511        let json = serde_json::to_string(&entry).unwrap();
512        assert!(
513            !json.contains("claim_source"),
514            "claim_source must be omitted when None: {json}"
515        );
516    }
517
518    #[test]
519    fn audit_entry_claim_source_some_present() {
520        use crate::executor::ClaimSource;
521        let entry = AuditEntry {
522            timestamp: "0".into(),
523            tool: "shell".into(),
524            command: "echo".into(),
525            result: AuditResult::Success,
526            duration_ms: 1,
527            error_category: None,
528            error_domain: None,
529            error_phase: None,
530            claim_source: Some(ClaimSource::Shell),
531            mcp_server_id: None,
532            injection_flagged: false,
533            embedding_anomalous: false,
534            cross_boundary_mcp_to_acp: false,
535            adversarial_policy_decision: None,
536            exit_code: None,
537            truncated: false,
538            policy_match: None,
539            caller_id: None,
540        };
541        let json = serde_json::to_string(&entry).unwrap();
542        assert!(
543            json.contains("\"claim_source\":\"shell\""),
544            "expected claim_source=shell in JSON: {json}"
545        );
546    }
547
548    #[tokio::test]
549    async fn audit_logger_multiple_entries() {
550        let dir = tempfile::tempdir().unwrap();
551        let path = dir.path().join("audit.log");
552        let config = AuditConfig {
553            enabled: true,
554            destination: path.display().to_string(),
555            ..Default::default()
556        };
557        let logger = AuditLogger::from_config(&config, false).await.unwrap();
558
559        for i in 0..5 {
560            let entry = AuditEntry {
561                timestamp: i.to_string(),
562                tool: "shell".into(),
563                command: format!("cmd{i}"),
564                result: AuditResult::Success,
565                duration_ms: i,
566                error_category: None,
567                error_domain: None,
568                error_phase: None,
569                claim_source: None,
570                mcp_server_id: None,
571                injection_flagged: false,
572                embedding_anomalous: false,
573                cross_boundary_mcp_to_acp: false,
574                adversarial_policy_decision: None,
575                exit_code: None,
576                truncated: false,
577                policy_match: None,
578                caller_id: None,
579            };
580            logger.log(&entry).await;
581        }
582
583        let content = tokio::fs::read_to_string(&path).await.unwrap();
584        assert_eq!(content.lines().count(), 5);
585    }
586
587    #[test]
588    fn audit_entry_exit_code_serialized() {
589        let entry = AuditEntry {
590            timestamp: "0".into(),
591            tool: "shell".into(),
592            command: "echo hi".into(),
593            result: AuditResult::Success,
594            duration_ms: 5,
595            error_category: None,
596            error_domain: None,
597            error_phase: None,
598            claim_source: None,
599            mcp_server_id: None,
600            injection_flagged: false,
601            embedding_anomalous: false,
602            cross_boundary_mcp_to_acp: false,
603            adversarial_policy_decision: None,
604            exit_code: Some(0),
605            truncated: false,
606            policy_match: None,
607            caller_id: None,
608        };
609        let json = serde_json::to_string(&entry).unwrap();
610        assert!(
611            json.contains("\"exit_code\":0"),
612            "exit_code must be serialized: {json}"
613        );
614    }
615
616    #[test]
617    fn audit_entry_exit_code_none_omitted() {
618        let entry = AuditEntry {
619            timestamp: "0".into(),
620            tool: "file".into(),
621            command: "read /tmp/x".into(),
622            result: AuditResult::Success,
623            duration_ms: 1,
624            error_category: None,
625            error_domain: None,
626            error_phase: None,
627            claim_source: None,
628            mcp_server_id: None,
629            injection_flagged: false,
630            embedding_anomalous: false,
631            cross_boundary_mcp_to_acp: false,
632            adversarial_policy_decision: None,
633            exit_code: None,
634            truncated: false,
635            policy_match: None,
636            caller_id: None,
637        };
638        let json = serde_json::to_string(&entry).unwrap();
639        assert!(
640            !json.contains("exit_code"),
641            "exit_code None must be omitted: {json}"
642        );
643    }
644
645    #[test]
646    fn log_tool_risk_summary_does_not_panic() {
647        log_tool_risk_summary(&[
648            "shell",
649            "bash",
650            "exec",
651            "web_scrape",
652            "fetch",
653            "scrape_page",
654            "file_write",
655            "file_read",
656            "file_delete",
657            "memory_search",
658            "unknown_tool",
659        ]);
660    }
661
662    #[test]
663    fn log_tool_risk_summary_empty_input_does_not_panic() {
664        log_tool_risk_summary(&[]);
665    }
666}