use std::path::Path;
use zeph_common::ToolName;
use crate::config::AuditConfig;
#[allow(clippy::trivially_copy_pass_by_ref)]
fn is_zero_u8(v: &u8) -> bool {
*v == 0
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct EgressEvent {
pub timestamp: String,
pub kind: &'static str,
pub correlation_id: String,
pub tool: ToolName,
pub url: String,
pub host: String,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<u16>,
pub duration_ms: u64,
pub response_bytes: usize,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub blocked: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub block_reason: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub caller_id: Option<String>,
#[serde(default, skip_serializing_if = "is_zero_u8")]
pub hop: u8,
}
impl EgressEvent {
#[must_use]
pub fn new_correlation_id() -> String {
uuid::Uuid::new_v4().to_string()
}
}
#[derive(Debug)]
pub struct AuditLogger {
destination: AuditDestination,
}
#[derive(Debug)]
enum AuditDestination {
Stdout,
File(tokio::sync::Mutex<tokio::fs::File>),
}
#[derive(serde::Serialize)]
#[allow(clippy::struct_excessive_bools)]
pub struct AuditEntry {
pub timestamp: String,
pub tool: ToolName,
pub command: String,
pub result: AuditResult,
pub duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_category: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_domain: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_phase: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub claim_source: Option<crate::executor::ClaimSource>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mcp_server_id: Option<String>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub injection_flagged: bool,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub embedding_anomalous: bool,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub cross_boundary_mcp_to_acp: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub adversarial_policy_decision: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub truncated: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub caller_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub policy_match: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub vigil_risk: Option<VigilRiskLevel>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum VigilRiskLevel {
Low,
Medium,
High,
}
#[derive(serde::Serialize)]
#[serde(tag = "type")]
pub enum AuditResult {
#[serde(rename = "success")]
Success,
#[serde(rename = "blocked")]
Blocked {
reason: String,
},
#[serde(rename = "error")]
Error {
message: String,
},
#[serde(rename = "timeout")]
Timeout,
#[serde(rename = "rollback")]
Rollback {
restored: usize,
deleted: usize,
},
}
impl AuditLogger {
#[allow(clippy::unused_async)]
pub async fn from_config(config: &AuditConfig, tui_mode: bool) -> Result<Self, std::io::Error> {
let effective_dest = if tui_mode && config.destination == "stdout" {
tracing::warn!("TUI mode: audit stdout redirected to file audit.jsonl");
"audit.jsonl".to_owned()
} else {
config.destination.clone()
};
let destination = if effective_dest == "stdout" {
AuditDestination::Stdout
} else {
let std_file = zeph_common::fs_secure::append_private(Path::new(&effective_dest))?;
let file = tokio::fs::File::from_std(std_file);
AuditDestination::File(tokio::sync::Mutex::new(file))
};
Ok(Self { destination })
}
pub async fn log(&self, entry: &AuditEntry) {
let json = match serde_json::to_string(entry) {
Ok(j) => j,
Err(err) => {
tracing::error!("audit entry serialization failed: {err}");
return;
}
};
match &self.destination {
AuditDestination::Stdout => {
tracing::info!(target: "audit", "{json}");
}
AuditDestination::File(file) => {
use tokio::io::AsyncWriteExt;
let mut f = file.lock().await;
let line = format!("{json}\n");
if let Err(e) = f.write_all(line.as_bytes()).await {
tracing::error!("failed to write audit log: {e}");
} else if let Err(e) = f.flush().await {
tracing::error!("failed to flush audit log: {e}");
}
}
}
}
pub async fn log_egress(&self, event: &EgressEvent) {
let json = match serde_json::to_string(event) {
Ok(j) => j,
Err(err) => {
tracing::error!("egress event serialization failed: {err}");
return;
}
};
match &self.destination {
AuditDestination::Stdout => {
tracing::info!(target: "audit", "{json}");
}
AuditDestination::File(file) => {
use tokio::io::AsyncWriteExt;
let mut f = file.lock().await;
let line = format!("{json}\n");
if let Err(e) = f.write_all(line.as_bytes()).await {
tracing::error!("failed to write egress log: {e}");
} else if let Err(e) = f.flush().await {
tracing::error!("failed to flush egress log: {e}");
}
}
}
}
}
pub fn log_tool_risk_summary(tool_ids: &[&str]) {
fn classify(id: &str) -> (&'static str, &'static str) {
if id.starts_with("shell") || id == "bash" || id == "exec" {
("high", "env_blocklist + command_blocklist")
} else if id.starts_with("web_scrape") || id == "fetch" || id.starts_with("scrape") {
("medium", "validate_url + SSRF + domain_policy")
} else if id.starts_with("file_write")
|| id.starts_with("file_read")
|| id.starts_with("file")
{
("medium", "path_sandbox")
} else {
("low", "schema_only")
}
}
for &id in tool_ids {
let (privilege, sanitization) = classify(id);
tracing::info!(
tool = id,
privilege_level = privilege,
expected_sanitization = sanitization,
"tool risk summary"
);
}
}
#[must_use]
pub fn chrono_now() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
format!("{secs}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn audit_entry_serialization() {
let entry = AuditEntry {
timestamp: "1234567890".into(),
tool: "shell".into(),
command: "echo hello".into(),
result: AuditResult::Success,
duration_ms: 42,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"type\":\"success\""));
assert!(json.contains("\"tool\":\"shell\""));
assert!(json.contains("\"duration_ms\":42"));
}
#[test]
fn audit_result_blocked_serialization() {
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "sudo rm".into(),
result: AuditResult::Blocked {
reason: "blocked command: sudo".into(),
},
duration_ms: 0,
error_category: Some("policy_blocked".to_owned()),
error_domain: Some("action".to_owned()),
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"type\":\"blocked\""));
assert!(json.contains("\"reason\""));
}
#[test]
fn audit_result_error_serialization() {
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "bad".into(),
result: AuditResult::Error {
message: "exec failed".into(),
},
duration_ms: 0,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"type\":\"error\""));
}
#[test]
fn audit_result_timeout_serialization() {
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "sleep 999".into(),
result: AuditResult::Timeout,
duration_ms: 30000,
error_category: Some("timeout".to_owned()),
error_domain: Some("system".to_owned()),
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(json.contains("\"type\":\"timeout\""));
}
#[tokio::test]
async fn audit_logger_stdout() {
let config = AuditConfig {
enabled: true,
destination: "stdout".into(),
..Default::default()
};
let logger = AuditLogger::from_config(&config, false).await.unwrap();
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "echo test".into(),
result: AuditResult::Success,
duration_ms: 1,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
logger.log(&entry).await;
}
#[tokio::test]
async fn audit_logger_file() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("audit.log");
let config = AuditConfig {
enabled: true,
destination: path.display().to_string(),
..Default::default()
};
let logger = AuditLogger::from_config(&config, false).await.unwrap();
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "echo test".into(),
result: AuditResult::Success,
duration_ms: 1,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
logger.log(&entry).await;
let content = tokio::fs::read_to_string(&path).await.unwrap();
assert!(content.contains("\"tool\":\"shell\""));
}
#[tokio::test]
async fn audit_logger_file_write_error_logged() {
let config = AuditConfig {
enabled: true,
destination: "/nonexistent/dir/audit.log".into(),
..Default::default()
};
let result = AuditLogger::from_config(&config, false).await;
assert!(result.is_err());
}
#[test]
fn claim_source_serde_roundtrip() {
use crate::executor::ClaimSource;
let cases = [
(ClaimSource::Shell, "\"shell\""),
(ClaimSource::FileSystem, "\"file_system\""),
(ClaimSource::WebScrape, "\"web_scrape\""),
(ClaimSource::Mcp, "\"mcp\""),
(ClaimSource::A2a, "\"a2a\""),
(ClaimSource::CodeSearch, "\"code_search\""),
(ClaimSource::Diagnostics, "\"diagnostics\""),
(ClaimSource::Memory, "\"memory\""),
];
for (variant, expected_json) in cases {
let serialized = serde_json::to_string(&variant).unwrap();
assert_eq!(serialized, expected_json, "serialize {variant:?}");
let deserialized: ClaimSource = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, variant, "deserialize {variant:?}");
}
}
#[test]
fn audit_entry_claim_source_none_omitted() {
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "echo".into(),
result: AuditResult::Success,
duration_ms: 1,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(
!json.contains("claim_source"),
"claim_source must be omitted when None: {json}"
);
}
#[test]
fn audit_entry_claim_source_some_present() {
use crate::executor::ClaimSource;
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "echo".into(),
result: AuditResult::Success,
duration_ms: 1,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: Some(ClaimSource::Shell),
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(
json.contains("\"claim_source\":\"shell\""),
"expected claim_source=shell in JSON: {json}"
);
}
#[tokio::test]
async fn audit_logger_multiple_entries() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("audit.log");
let config = AuditConfig {
enabled: true,
destination: path.display().to_string(),
..Default::default()
};
let logger = AuditLogger::from_config(&config, false).await.unwrap();
for i in 0..5 {
let entry = AuditEntry {
timestamp: i.to_string(),
tool: "shell".into(),
command: format!("cmd{i}"),
result: AuditResult::Success,
duration_ms: i,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
logger.log(&entry).await;
}
let content = tokio::fs::read_to_string(&path).await.unwrap();
assert_eq!(content.lines().count(), 5);
}
#[test]
fn audit_entry_exit_code_serialized() {
let entry = AuditEntry {
timestamp: "0".into(),
tool: "shell".into(),
command: "echo hi".into(),
result: AuditResult::Success,
duration_ms: 5,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: Some(0),
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(
json.contains("\"exit_code\":0"),
"exit_code must be serialized: {json}"
);
}
#[test]
fn audit_entry_exit_code_none_omitted() {
let entry = AuditEntry {
timestamp: "0".into(),
tool: "file".into(),
command: "read /tmp/x".into(),
result: AuditResult::Success,
duration_ms: 1,
error_category: None,
error_domain: None,
error_phase: None,
claim_source: None,
mcp_server_id: None,
injection_flagged: false,
embedding_anomalous: false,
cross_boundary_mcp_to_acp: false,
adversarial_policy_decision: None,
exit_code: None,
truncated: false,
policy_match: None,
correlation_id: None,
caller_id: None,
vigil_risk: None,
};
let json = serde_json::to_string(&entry).unwrap();
assert!(
!json.contains("exit_code"),
"exit_code None must be omitted: {json}"
);
}
#[test]
fn log_tool_risk_summary_does_not_panic() {
log_tool_risk_summary(&[
"shell",
"bash",
"exec",
"web_scrape",
"fetch",
"scrape_page",
"file_write",
"file_read",
"file_delete",
"memory_search",
"unknown_tool",
]);
}
#[test]
fn log_tool_risk_summary_empty_input_does_not_panic() {
log_tool_risk_summary(&[]);
}
}