Skip to main content

zeph_tools/
audit.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Structured JSONL audit logging for tool invocations.
5//!
6//! Every tool execution produces an [`AuditEntry`] that is serialized as a newline-delimited
7//! JSON record and written to the configured destination (stdout or a file).
8//!
9//! # Configuration
10//!
11//! Audit logging is controlled by [`AuditConfig`]. When
12//! `destination` is `"stdout"`, entries are emitted via `tracing::info!(target: "audit", ...)`.
13//! Any other value is treated as a file path opened in append mode.
14//!
15//! # Security note
16//!
17//! Audit entries intentionally omit the raw cosine distance from anomaly detection
18//! (`embedding_anomalous` is a boolean flag) to prevent threshold reverse-engineering.
19
20use std::path::Path;
21
22use zeph_common::ToolName;
23
24use crate::config::AuditConfig;
25
26/// Async writer that appends [`AuditEntry`] records to a structured JSONL log.
27///
28/// Create via [`AuditLogger::from_config`] and share behind an `Arc`. Each executor
29/// that should emit audit records accepts the logger via a builder method
30/// (e.g. [`ShellExecutor::with_audit`](crate::ShellExecutor::with_audit)).
31///
32/// # Thread safety
33///
34/// File writes are serialized through an internal `tokio::sync::Mutex<File>`.
35/// Multiple concurrent log calls are safe but may block briefly on the mutex.
36#[derive(Debug)]
37pub struct AuditLogger {
38    destination: AuditDestination,
39}
40
41#[derive(Debug)]
42enum AuditDestination {
43    Stdout,
44    File(tokio::sync::Mutex<tokio::fs::File>),
45}
46
47/// A single tool invocation record written to the audit log.
48///
49/// Serialized as a flat JSON object (newline-terminated). Optional fields are omitted
50/// when `None` or `false` to keep entries compact.
51///
52/// # Example JSON output
53///
54/// ```json
55/// {"timestamp":"1712345678","tool":"shell","command":"ls -la","result":{"type":"success"},
56///  "duration_ms":12,"exit_code":0,"claim_source":"shell"}
57/// ```
58#[derive(serde::Serialize)]
59#[allow(clippy::struct_excessive_bools)]
60pub struct AuditEntry {
61    /// Unix timestamp (seconds) when the tool invocation started.
62    pub timestamp: String,
63    /// Tool identifier (e.g. `"shell"`, `"web_scrape"`, `"fetch"`).
64    pub tool: ToolName,
65    /// Human-readable command or URL being invoked.
66    pub command: String,
67    /// Outcome of the invocation.
68    pub result: AuditResult,
69    /// Wall-clock duration from invocation start to completion, in milliseconds.
70    pub duration_ms: u64,
71    /// Fine-grained error category label from the taxonomy. `None` for successful executions.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub error_category: Option<String>,
74    /// High-level error domain for recovery dispatch. `None` for successful executions.
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub error_domain: Option<String>,
77    /// Invocation phase in which the error occurred per arXiv:2601.16280 taxonomy.
78    /// `None` for successful executions.
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub error_phase: Option<String>,
81    /// Provenance of the tool result. `None` for non-executor audit entries (e.g. policy checks).
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub claim_source: Option<crate::executor::ClaimSource>,
84    /// MCP server ID for tool calls routed through `McpToolExecutor`. `None` for native tools.
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub mcp_server_id: Option<String>,
87    /// Tool output was flagged by regex injection detection.
88    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
89    pub injection_flagged: bool,
90    /// Tool output was flagged as anomalous by the embedding guard.
91    /// Raw cosine distance is NOT stored (prevents threshold reverse-engineering).
92    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
93    pub embedding_anomalous: bool,
94    /// Tool result crossed the MCP-to-ACP trust boundary (MCP tool result served to an ACP client).
95    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
96    pub cross_boundary_mcp_to_acp: bool,
97    /// Decision recorded by the adversarial policy agent before execution.
98    ///
99    /// Values: `"allow"`, `"deny:<reason>"`, `"error:<message>"`.
100    /// `None` when adversarial policy is disabled or not applicable.
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub adversarial_policy_decision: Option<String>,
103    /// Process exit code for shell tool executions. `None` for non-shell tools.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub exit_code: Option<i32>,
106    /// Whether tool output was truncated before storage. Default false.
107    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
108    pub truncated: bool,
109    /// Caller identity that initiated this tool call. `None` for system calls.
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub caller_id: Option<String>,
112    /// Policy rule trace that matched this tool call. Populated from `PolicyDecision::trace`.
113    /// `None` when policy is disabled or this entry is not from a policy check.
114    #[serde(skip_serializing_if = "Option::is_none")]
115    pub policy_match: Option<String>,
116}
117
118/// Outcome of a tool invocation, serialized as a tagged JSON object.
119///
120/// The `type` field selects the variant; additional fields are present only for the
121/// relevant variants.
122///
123/// # Serialization
124///
125/// ```json
126/// {"type":"success"}
127/// {"type":"blocked","reason":"sudo"}
128/// {"type":"error","message":"exec failed"}
129/// {"type":"timeout"}
130/// {"type":"rollback","restored":3,"deleted":1}
131/// ```
132#[derive(serde::Serialize)]
133#[serde(tag = "type")]
134pub enum AuditResult {
135    /// The tool executed successfully.
136    #[serde(rename = "success")]
137    Success,
138    /// The tool invocation was blocked by policy before execution.
139    #[serde(rename = "blocked")]
140    Blocked {
141        /// The matched blocklist pattern or policy rule that triggered the block.
142        reason: String,
143    },
144    /// The tool attempted execution but failed with an error.
145    #[serde(rename = "error")]
146    Error {
147        /// Human-readable error description.
148        message: String,
149    },
150    /// The tool exceeded its configured timeout.
151    #[serde(rename = "timeout")]
152    Timeout,
153    /// A transactional rollback was performed after a failed execution.
154    #[serde(rename = "rollback")]
155    Rollback {
156        /// Number of files restored to their pre-execution snapshot.
157        restored: usize,
158        /// Number of newly-created files that were deleted during rollback.
159        deleted: usize,
160    },
161}
162
163impl AuditLogger {
164    /// Create a new `AuditLogger` from config.
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if a file destination cannot be opened.
169    pub async fn from_config(config: &AuditConfig) -> Result<Self, std::io::Error> {
170        let destination = if config.destination == "stdout" {
171            AuditDestination::Stdout
172        } else {
173            let file = tokio::fs::OpenOptions::new()
174                .create(true)
175                .append(true)
176                .open(Path::new(&config.destination))
177                .await?;
178            AuditDestination::File(tokio::sync::Mutex::new(file))
179        };
180
181        Ok(Self { destination })
182    }
183
184    /// Serialize `entry` to JSON and append it to the configured destination.
185    ///
186    /// Serialization errors are logged via `tracing::error!` and silently swallowed so
187    /// that audit failures never interrupt tool execution.
188    pub async fn log(&self, entry: &AuditEntry) {
189        let json = match serde_json::to_string(entry) {
190            Ok(j) => j,
191            Err(err) => {
192                tracing::error!("audit entry serialization failed: {err}");
193                return;
194            }
195        };
196
197        match &self.destination {
198            AuditDestination::Stdout => {
199                tracing::info!(target: "audit", "{json}");
200            }
201            AuditDestination::File(file) => {
202                use tokio::io::AsyncWriteExt;
203                let mut f = file.lock().await;
204                let line = format!("{json}\n");
205                if let Err(e) = f.write_all(line.as_bytes()).await {
206                    tracing::error!("failed to write audit log: {e}");
207                } else if let Err(e) = f.flush().await {
208                    tracing::error!("failed to flush audit log: {e}");
209                }
210            }
211        }
212    }
213}
214
215/// Log a per-tool risk summary at startup when `audit.tool_risk_summary = true`.
216///
217/// Each entry records tool name, privilege level (static mapping by tool id), and the
218/// expected input sanitization method. This is a design-time inventory label —
219/// NOT a runtime guarantee that sanitization is functioning correctly.
220pub fn log_tool_risk_summary(tool_ids: &[&str]) {
221    // Static privilege mapping: tool id prefix → (privilege level, expected sanitization).
222    // "high" = can execute arbitrary OS commands; "medium" = network/filesystem access;
223    // "low" = schema-validated parameters only.
224    fn classify(id: &str) -> (&'static str, &'static str) {
225        if id.starts_with("shell") || id == "bash" || id == "exec" {
226            ("high", "env_blocklist + command_blocklist")
227        } else if id.starts_with("web_scrape") || id == "fetch" || id.starts_with("scrape") {
228            ("medium", "validate_url + SSRF + domain_policy")
229        } else if id.starts_with("file_write")
230            || id.starts_with("file_read")
231            || id.starts_with("file")
232        {
233            ("medium", "path_sandbox")
234        } else {
235            ("low", "schema_only")
236        }
237    }
238
239    for &id in tool_ids {
240        let (privilege, sanitization) = classify(id);
241        tracing::info!(
242            tool = id,
243            privilege_level = privilege,
244            expected_sanitization = sanitization,
245            "tool risk summary"
246        );
247    }
248}
249
250/// Returns the current Unix timestamp as a decimal string.
251///
252/// Used to populate [`AuditEntry::timestamp`]. Returns `"0"` if the system clock
253/// is before the Unix epoch (which should never happen in practice).
254#[must_use]
255pub fn chrono_now() -> String {
256    use std::time::{SystemTime, UNIX_EPOCH};
257    let secs = SystemTime::now()
258        .duration_since(UNIX_EPOCH)
259        .unwrap_or_default()
260        .as_secs();
261    format!("{secs}")
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn audit_entry_serialization() {
270        let entry = AuditEntry {
271            timestamp: "1234567890".into(),
272            tool: "shell".into(),
273            command: "echo hello".into(),
274            result: AuditResult::Success,
275            duration_ms: 42,
276            error_category: None,
277            error_domain: None,
278            error_phase: None,
279            claim_source: None,
280            mcp_server_id: None,
281            injection_flagged: false,
282            embedding_anomalous: false,
283            cross_boundary_mcp_to_acp: false,
284            adversarial_policy_decision: None,
285            exit_code: None,
286            truncated: false,
287            policy_match: None,
288            caller_id: None,
289        };
290        let json = serde_json::to_string(&entry).unwrap();
291        assert!(json.contains("\"type\":\"success\""));
292        assert!(json.contains("\"tool\":\"shell\""));
293        assert!(json.contains("\"duration_ms\":42"));
294    }
295
296    #[test]
297    fn audit_result_blocked_serialization() {
298        let entry = AuditEntry {
299            timestamp: "0".into(),
300            tool: "shell".into(),
301            command: "sudo rm".into(),
302            result: AuditResult::Blocked {
303                reason: "blocked command: sudo".into(),
304            },
305            duration_ms: 0,
306            error_category: Some("policy_blocked".to_owned()),
307            error_domain: Some("action".to_owned()),
308            error_phase: None,
309            claim_source: None,
310            mcp_server_id: None,
311            injection_flagged: false,
312            embedding_anomalous: false,
313            cross_boundary_mcp_to_acp: false,
314            adversarial_policy_decision: None,
315            exit_code: None,
316            truncated: false,
317            policy_match: None,
318            caller_id: None,
319        };
320        let json = serde_json::to_string(&entry).unwrap();
321        assert!(json.contains("\"type\":\"blocked\""));
322        assert!(json.contains("\"reason\""));
323    }
324
325    #[test]
326    fn audit_result_error_serialization() {
327        let entry = AuditEntry {
328            timestamp: "0".into(),
329            tool: "shell".into(),
330            command: "bad".into(),
331            result: AuditResult::Error {
332                message: "exec failed".into(),
333            },
334            duration_ms: 0,
335            error_category: None,
336            error_domain: None,
337            error_phase: None,
338            claim_source: None,
339            mcp_server_id: None,
340            injection_flagged: false,
341            embedding_anomalous: false,
342            cross_boundary_mcp_to_acp: false,
343            adversarial_policy_decision: None,
344            exit_code: None,
345            truncated: false,
346            policy_match: None,
347            caller_id: None,
348        };
349        let json = serde_json::to_string(&entry).unwrap();
350        assert!(json.contains("\"type\":\"error\""));
351    }
352
353    #[test]
354    fn audit_result_timeout_serialization() {
355        let entry = AuditEntry {
356            timestamp: "0".into(),
357            tool: "shell".into(),
358            command: "sleep 999".into(),
359            result: AuditResult::Timeout,
360            duration_ms: 30000,
361            error_category: Some("timeout".to_owned()),
362            error_domain: Some("system".to_owned()),
363            error_phase: None,
364            claim_source: None,
365            mcp_server_id: None,
366            injection_flagged: false,
367            embedding_anomalous: false,
368            cross_boundary_mcp_to_acp: false,
369            adversarial_policy_decision: None,
370            exit_code: None,
371            truncated: false,
372            policy_match: None,
373            caller_id: None,
374        };
375        let json = serde_json::to_string(&entry).unwrap();
376        assert!(json.contains("\"type\":\"timeout\""));
377    }
378
379    #[tokio::test]
380    async fn audit_logger_stdout() {
381        let config = AuditConfig {
382            enabled: true,
383            destination: "stdout".into(),
384            ..Default::default()
385        };
386        let logger = AuditLogger::from_config(&config).await.unwrap();
387        let entry = AuditEntry {
388            timestamp: "0".into(),
389            tool: "shell".into(),
390            command: "echo test".into(),
391            result: AuditResult::Success,
392            duration_ms: 1,
393            error_category: None,
394            error_domain: None,
395            error_phase: None,
396            claim_source: None,
397            mcp_server_id: None,
398            injection_flagged: false,
399            embedding_anomalous: false,
400            cross_boundary_mcp_to_acp: false,
401            adversarial_policy_decision: None,
402            exit_code: None,
403            truncated: false,
404            policy_match: None,
405            caller_id: None,
406        };
407        logger.log(&entry).await;
408    }
409
410    #[tokio::test]
411    async fn audit_logger_file() {
412        let dir = tempfile::tempdir().unwrap();
413        let path = dir.path().join("audit.log");
414        let config = AuditConfig {
415            enabled: true,
416            destination: path.display().to_string(),
417            ..Default::default()
418        };
419        let logger = AuditLogger::from_config(&config).await.unwrap();
420        let entry = AuditEntry {
421            timestamp: "0".into(),
422            tool: "shell".into(),
423            command: "echo test".into(),
424            result: AuditResult::Success,
425            duration_ms: 1,
426            error_category: None,
427            error_domain: None,
428            error_phase: None,
429            claim_source: None,
430            mcp_server_id: None,
431            injection_flagged: false,
432            embedding_anomalous: false,
433            cross_boundary_mcp_to_acp: false,
434            adversarial_policy_decision: None,
435            exit_code: None,
436            truncated: false,
437            policy_match: None,
438            caller_id: None,
439        };
440        logger.log(&entry).await;
441
442        let content = tokio::fs::read_to_string(&path).await.unwrap();
443        assert!(content.contains("\"tool\":\"shell\""));
444    }
445
446    #[tokio::test]
447    async fn audit_logger_file_write_error_logged() {
448        let config = AuditConfig {
449            enabled: true,
450            destination: "/nonexistent/dir/audit.log".into(),
451            ..Default::default()
452        };
453        let result = AuditLogger::from_config(&config).await;
454        assert!(result.is_err());
455    }
456
457    #[test]
458    fn claim_source_serde_roundtrip() {
459        use crate::executor::ClaimSource;
460        let cases = [
461            (ClaimSource::Shell, "\"shell\""),
462            (ClaimSource::FileSystem, "\"file_system\""),
463            (ClaimSource::WebScrape, "\"web_scrape\""),
464            (ClaimSource::Mcp, "\"mcp\""),
465            (ClaimSource::A2a, "\"a2a\""),
466            (ClaimSource::CodeSearch, "\"code_search\""),
467            (ClaimSource::Diagnostics, "\"diagnostics\""),
468            (ClaimSource::Memory, "\"memory\""),
469        ];
470        for (variant, expected_json) in cases {
471            let serialized = serde_json::to_string(&variant).unwrap();
472            assert_eq!(serialized, expected_json, "serialize {variant:?}");
473            let deserialized: ClaimSource = serde_json::from_str(&serialized).unwrap();
474            assert_eq!(deserialized, variant, "deserialize {variant:?}");
475        }
476    }
477
478    #[test]
479    fn audit_entry_claim_source_none_omitted() {
480        let entry = AuditEntry {
481            timestamp: "0".into(),
482            tool: "shell".into(),
483            command: "echo".into(),
484            result: AuditResult::Success,
485            duration_ms: 1,
486            error_category: None,
487            error_domain: None,
488            error_phase: None,
489            claim_source: None,
490            mcp_server_id: None,
491            injection_flagged: false,
492            embedding_anomalous: false,
493            cross_boundary_mcp_to_acp: false,
494            adversarial_policy_decision: None,
495            exit_code: None,
496            truncated: false,
497            policy_match: None,
498            caller_id: None,
499        };
500        let json = serde_json::to_string(&entry).unwrap();
501        assert!(
502            !json.contains("claim_source"),
503            "claim_source must be omitted when None: {json}"
504        );
505    }
506
507    #[test]
508    fn audit_entry_claim_source_some_present() {
509        use crate::executor::ClaimSource;
510        let entry = AuditEntry {
511            timestamp: "0".into(),
512            tool: "shell".into(),
513            command: "echo".into(),
514            result: AuditResult::Success,
515            duration_ms: 1,
516            error_category: None,
517            error_domain: None,
518            error_phase: None,
519            claim_source: Some(ClaimSource::Shell),
520            mcp_server_id: None,
521            injection_flagged: false,
522            embedding_anomalous: false,
523            cross_boundary_mcp_to_acp: false,
524            adversarial_policy_decision: None,
525            exit_code: None,
526            truncated: false,
527            policy_match: None,
528            caller_id: None,
529        };
530        let json = serde_json::to_string(&entry).unwrap();
531        assert!(
532            json.contains("\"claim_source\":\"shell\""),
533            "expected claim_source=shell in JSON: {json}"
534        );
535    }
536
537    #[tokio::test]
538    async fn audit_logger_multiple_entries() {
539        let dir = tempfile::tempdir().unwrap();
540        let path = dir.path().join("audit.log");
541        let config = AuditConfig {
542            enabled: true,
543            destination: path.display().to_string(),
544            ..Default::default()
545        };
546        let logger = AuditLogger::from_config(&config).await.unwrap();
547
548        for i in 0..5 {
549            let entry = AuditEntry {
550                timestamp: i.to_string(),
551                tool: "shell".into(),
552                command: format!("cmd{i}"),
553                result: AuditResult::Success,
554                duration_ms: i,
555                error_category: None,
556                error_domain: None,
557                error_phase: None,
558                claim_source: None,
559                mcp_server_id: None,
560                injection_flagged: false,
561                embedding_anomalous: false,
562                cross_boundary_mcp_to_acp: false,
563                adversarial_policy_decision: None,
564                exit_code: None,
565                truncated: false,
566                policy_match: None,
567                caller_id: None,
568            };
569            logger.log(&entry).await;
570        }
571
572        let content = tokio::fs::read_to_string(&path).await.unwrap();
573        assert_eq!(content.lines().count(), 5);
574    }
575
576    #[test]
577    fn audit_entry_exit_code_serialized() {
578        let entry = AuditEntry {
579            timestamp: "0".into(),
580            tool: "shell".into(),
581            command: "echo hi".into(),
582            result: AuditResult::Success,
583            duration_ms: 5,
584            error_category: None,
585            error_domain: None,
586            error_phase: None,
587            claim_source: None,
588            mcp_server_id: None,
589            injection_flagged: false,
590            embedding_anomalous: false,
591            cross_boundary_mcp_to_acp: false,
592            adversarial_policy_decision: None,
593            exit_code: Some(0),
594            truncated: false,
595            policy_match: None,
596            caller_id: None,
597        };
598        let json = serde_json::to_string(&entry).unwrap();
599        assert!(
600            json.contains("\"exit_code\":0"),
601            "exit_code must be serialized: {json}"
602        );
603    }
604
605    #[test]
606    fn audit_entry_exit_code_none_omitted() {
607        let entry = AuditEntry {
608            timestamp: "0".into(),
609            tool: "file".into(),
610            command: "read /tmp/x".into(),
611            result: AuditResult::Success,
612            duration_ms: 1,
613            error_category: None,
614            error_domain: None,
615            error_phase: None,
616            claim_source: None,
617            mcp_server_id: None,
618            injection_flagged: false,
619            embedding_anomalous: false,
620            cross_boundary_mcp_to_acp: false,
621            adversarial_policy_decision: None,
622            exit_code: None,
623            truncated: false,
624            policy_match: None,
625            caller_id: None,
626        };
627        let json = serde_json::to_string(&entry).unwrap();
628        assert!(
629            !json.contains("exit_code"),
630            "exit_code None must be omitted: {json}"
631        );
632    }
633
634    #[test]
635    fn log_tool_risk_summary_does_not_panic() {
636        log_tool_risk_summary(&[
637            "shell",
638            "bash",
639            "exec",
640            "web_scrape",
641            "fetch",
642            "scrape_page",
643            "file_write",
644            "file_read",
645            "file_delete",
646            "memory_search",
647            "unknown_tool",
648        ]);
649    }
650
651    #[test]
652    fn log_tool_risk_summary_empty_input_does_not_panic() {
653        log_tool_risk_summary(&[]);
654    }
655}