1use std::path::Path;
21
22use zeph_common::ToolName;
23
24use crate::config::AuditConfig;
25
26#[allow(clippy::trivially_copy_pass_by_ref)]
27fn is_zero_u8(v: &u8) -> bool {
28 *v == 0
29}
30
31#[derive(Debug, Clone, serde::Serialize)]
45pub struct EgressEvent {
46 pub timestamp: String,
48 pub kind: &'static str,
51 pub correlation_id: String,
53 pub tool: ToolName,
55 pub url: String,
57 pub host: String,
59 pub method: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub status: Option<u16>,
64 pub duration_ms: u64,
66 pub response_bytes: usize,
69 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
71 pub blocked: bool,
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub block_reason: Option<&'static str>,
75 #[serde(skip_serializing_if = "Option::is_none")]
77 pub caller_id: Option<String>,
78 #[serde(default, skip_serializing_if = "is_zero_u8")]
81 pub hop: u8,
82}
83
84impl EgressEvent {
85 #[must_use]
87 pub fn new_correlation_id() -> String {
88 uuid::Uuid::new_v4().to_string()
89 }
90}
91
92#[derive(Debug)]
103pub struct AuditLogger {
104 destination: AuditDestination,
105}
106
107#[derive(Debug)]
108enum AuditDestination {
109 Stdout,
110 File(tokio::sync::Mutex<tokio::fs::File>),
111}
112
113#[derive(serde::Serialize)]
125#[allow(clippy::struct_excessive_bools)] pub struct AuditEntry {
127 pub timestamp: String,
129 pub tool: ToolName,
131 pub command: String,
133 pub result: AuditResult,
135 pub duration_ms: u64,
137 #[serde(skip_serializing_if = "Option::is_none")]
139 pub error_category: Option<String>,
140 #[serde(skip_serializing_if = "Option::is_none")]
142 pub error_domain: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
146 pub error_phase: Option<String>,
147 #[serde(skip_serializing_if = "Option::is_none")]
149 pub claim_source: Option<crate::executor::ClaimSource>,
150 #[serde(skip_serializing_if = "Option::is_none")]
152 pub mcp_server_id: Option<String>,
153 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
155 pub injection_flagged: bool,
156 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
159 pub embedding_anomalous: bool,
160 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
162 pub cross_boundary_mcp_to_acp: bool,
163 #[serde(skip_serializing_if = "Option::is_none")]
168 pub adversarial_policy_decision: Option<String>,
169 #[serde(skip_serializing_if = "Option::is_none")]
171 pub exit_code: Option<i32>,
172 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
174 pub truncated: bool,
175 #[serde(skip_serializing_if = "Option::is_none")]
177 pub caller_id: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
181 pub policy_match: Option<String>,
182 #[serde(skip_serializing_if = "Option::is_none")]
186 pub correlation_id: Option<String>,
187 #[serde(skip_serializing_if = "Option::is_none")]
190 pub vigil_risk: Option<VigilRiskLevel>,
191 #[serde(skip_serializing_if = "Option::is_none")]
194 pub execution_env: Option<String>,
195 #[serde(skip_serializing_if = "Option::is_none")]
198 pub resolved_cwd: Option<String>,
199 #[serde(skip_serializing_if = "Option::is_none")]
202 pub scope_at_definition: Option<String>,
203 #[serde(skip_serializing_if = "Option::is_none")]
206 pub scope_at_dispatch: Option<String>,
207}
208
209#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
214#[serde(rename_all = "lowercase")]
215pub enum VigilRiskLevel {
216 Low,
218 Medium,
220 High,
222}
223
224#[derive(serde::Serialize)]
239#[serde(tag = "type")]
240#[non_exhaustive]
241pub enum AuditResult {
242 #[serde(rename = "success")]
244 Success,
245 #[serde(rename = "blocked")]
247 Blocked {
248 reason: String,
250 },
251 #[serde(rename = "error")]
253 Error {
254 message: String,
256 },
257 #[serde(rename = "timeout")]
259 Timeout,
260 #[serde(rename = "rollback")]
262 Rollback {
263 restored: usize,
265 deleted: usize,
267 },
268}
269
270impl AuditLogger {
271 #[allow(clippy::unused_async)]
281 pub async fn from_config(config: &AuditConfig, tui_mode: bool) -> Result<Self, std::io::Error> {
282 use zeph_config::AuditDestination as CfgDest;
283
284 let destination = match &config.destination {
285 CfgDest::Stdout if tui_mode => {
286 tracing::warn!("TUI mode: audit stdout redirected to file audit.jsonl");
287 let std_file = zeph_common::fs_secure::append_private(Path::new("audit.jsonl"))?;
288 let file = tokio::fs::File::from_std(std_file);
289 AuditDestination::File(tokio::sync::Mutex::new(file))
290 }
291 CfgDest::File(path) => {
292 let std_file = zeph_common::fs_secure::append_private(path)?;
293 let file = tokio::fs::File::from_std(std_file);
294 AuditDestination::File(tokio::sync::Mutex::new(file))
295 }
296 _ => AuditDestination::Stdout,
297 };
298
299 Ok(Self { destination })
300 }
301
302 pub async fn log(&self, entry: &AuditEntry) {
307 let json = match serde_json::to_string(entry) {
308 Ok(j) => j,
309 Err(err) => {
310 tracing::error!("audit entry serialization failed: {err}");
311 return;
312 }
313 };
314
315 match &self.destination {
316 AuditDestination::Stdout => {
317 tracing::info!(target: "audit", "{json}");
318 }
319 AuditDestination::File(file) => {
320 use tokio::io::AsyncWriteExt;
321 let mut f = file.lock().await;
322 let line = format!("{json}\n");
323 if let Err(e) = f.write_all(line.as_bytes()).await {
324 tracing::error!("failed to write audit log: {e}");
325 } else if let Err(e) = f.flush().await {
326 tracing::error!("failed to flush audit log: {e}");
327 }
328 }
329 }
330 }
331
332 pub async fn log_egress(&self, event: &EgressEvent) {
340 let json = match serde_json::to_string(event) {
341 Ok(j) => j,
342 Err(err) => {
343 tracing::error!("egress event serialization failed: {err}");
344 return;
345 }
346 };
347
348 match &self.destination {
349 AuditDestination::Stdout => {
350 tracing::info!(target: "audit", "{json}");
351 }
352 AuditDestination::File(file) => {
353 use tokio::io::AsyncWriteExt;
354 let mut f = file.lock().await;
355 let line = format!("{json}\n");
356 if let Err(e) = f.write_all(line.as_bytes()).await {
357 tracing::error!("failed to write egress log: {e}");
358 } else if let Err(e) = f.flush().await {
359 tracing::error!("failed to flush egress log: {e}");
360 }
361 }
362 }
363 }
364}
365
366pub fn log_tool_risk_summary(tool_ids: &[&str]) {
372 fn classify(id: &str) -> (&'static str, &'static str) {
376 if id.starts_with("shell") || id == "bash" || id == "exec" {
377 ("high", "env_blocklist + command_blocklist")
378 } else if id.starts_with("web_scrape") || id == "fetch" || id.starts_with("scrape") {
379 ("medium", "validate_url + SSRF + domain_policy")
380 } else if id.starts_with("file_write")
381 || id.starts_with("file_read")
382 || id.starts_with("file")
383 {
384 ("medium", "path_sandbox")
385 } else {
386 ("low", "schema_only")
387 }
388 }
389
390 for &id in tool_ids {
391 let (privilege, sanitization) = classify(id);
392 tracing::info!(
393 tool = id,
394 privilege_level = privilege,
395 expected_sanitization = sanitization,
396 "tool risk summary"
397 );
398 }
399}
400
401#[must_use]
406pub fn chrono_now() -> String {
407 use std::time::{SystemTime, UNIX_EPOCH};
408 let secs = SystemTime::now()
409 .duration_since(UNIX_EPOCH)
410 .unwrap_or_default()
411 .as_secs();
412 format!("{secs}")
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 #[test]
420 fn audit_entry_serialization() {
421 let entry = AuditEntry {
422 timestamp: "1234567890".into(),
423 tool: "shell".into(),
424 command: "echo hello".into(),
425 result: AuditResult::Success,
426 duration_ms: 42,
427 error_category: None,
428 error_domain: None,
429 error_phase: None,
430 claim_source: None,
431 mcp_server_id: None,
432 injection_flagged: false,
433 embedding_anomalous: false,
434 cross_boundary_mcp_to_acp: false,
435 adversarial_policy_decision: None,
436 exit_code: None,
437 truncated: false,
438 policy_match: None,
439 correlation_id: None,
440 caller_id: None,
441 vigil_risk: None,
442 execution_env: None,
443 resolved_cwd: None,
444 scope_at_definition: None,
445 scope_at_dispatch: None,
446 };
447 let json = serde_json::to_string(&entry).unwrap();
448 assert!(json.contains("\"type\":\"success\""));
449 assert!(json.contains("\"tool\":\"shell\""));
450 assert!(json.contains("\"duration_ms\":42"));
451 }
452
453 #[test]
454 fn audit_result_blocked_serialization() {
455 let entry = AuditEntry {
456 timestamp: "0".into(),
457 tool: "shell".into(),
458 command: "sudo rm".into(),
459 result: AuditResult::Blocked {
460 reason: "blocked command: sudo".into(),
461 },
462 duration_ms: 0,
463 error_category: Some("policy_blocked".to_owned()),
464 error_domain: Some("action".to_owned()),
465 error_phase: None,
466 claim_source: None,
467 mcp_server_id: None,
468 injection_flagged: false,
469 embedding_anomalous: false,
470 cross_boundary_mcp_to_acp: false,
471 adversarial_policy_decision: None,
472 exit_code: None,
473 truncated: false,
474 policy_match: None,
475 correlation_id: None,
476 caller_id: None,
477 vigil_risk: None,
478 execution_env: None,
479 resolved_cwd: None,
480 scope_at_definition: None,
481 scope_at_dispatch: None,
482 };
483 let json = serde_json::to_string(&entry).unwrap();
484 assert!(json.contains("\"type\":\"blocked\""));
485 assert!(json.contains("\"reason\""));
486 }
487
488 #[test]
489 fn audit_result_error_serialization() {
490 let entry = AuditEntry {
491 timestamp: "0".into(),
492 tool: "shell".into(),
493 command: "bad".into(),
494 result: AuditResult::Error {
495 message: "exec failed".into(),
496 },
497 duration_ms: 0,
498 error_category: None,
499 error_domain: None,
500 error_phase: None,
501 claim_source: None,
502 mcp_server_id: None,
503 injection_flagged: false,
504 embedding_anomalous: false,
505 cross_boundary_mcp_to_acp: false,
506 adversarial_policy_decision: None,
507 exit_code: None,
508 truncated: false,
509 policy_match: None,
510 correlation_id: None,
511 caller_id: None,
512 vigil_risk: None,
513 execution_env: None,
514 resolved_cwd: None,
515 scope_at_definition: None,
516 scope_at_dispatch: None,
517 };
518 let json = serde_json::to_string(&entry).unwrap();
519 assert!(json.contains("\"type\":\"error\""));
520 }
521
522 #[test]
523 fn audit_result_timeout_serialization() {
524 let entry = AuditEntry {
525 timestamp: "0".into(),
526 tool: "shell".into(),
527 command: "sleep 999".into(),
528 result: AuditResult::Timeout,
529 duration_ms: 30000,
530 error_category: Some("timeout".to_owned()),
531 error_domain: Some("system".to_owned()),
532 error_phase: None,
533 claim_source: None,
534 mcp_server_id: None,
535 injection_flagged: false,
536 embedding_anomalous: false,
537 cross_boundary_mcp_to_acp: false,
538 adversarial_policy_decision: None,
539 exit_code: None,
540 truncated: false,
541 policy_match: None,
542 correlation_id: None,
543 caller_id: None,
544 vigil_risk: None,
545 execution_env: None,
546 resolved_cwd: None,
547 scope_at_definition: None,
548 scope_at_dispatch: None,
549 };
550 let json = serde_json::to_string(&entry).unwrap();
551 assert!(json.contains("\"type\":\"timeout\""));
552 }
553
554 #[tokio::test]
555 async fn audit_logger_stdout() {
556 let config = AuditConfig {
557 enabled: true,
558 destination: crate::config::AuditDestination::Stdout,
559 ..Default::default()
560 };
561 let logger = AuditLogger::from_config(&config, false).await.unwrap();
562 let entry = AuditEntry {
563 timestamp: "0".into(),
564 tool: "shell".into(),
565 command: "echo test".into(),
566 result: AuditResult::Success,
567 duration_ms: 1,
568 error_category: None,
569 error_domain: None,
570 error_phase: None,
571 claim_source: None,
572 mcp_server_id: None,
573 injection_flagged: false,
574 embedding_anomalous: false,
575 cross_boundary_mcp_to_acp: false,
576 adversarial_policy_decision: None,
577 exit_code: None,
578 truncated: false,
579 policy_match: None,
580 correlation_id: None,
581 caller_id: None,
582 vigil_risk: None,
583 execution_env: None,
584 resolved_cwd: None,
585 scope_at_definition: None,
586 scope_at_dispatch: None,
587 };
588 logger.log(&entry).await;
589 }
590
591 #[tokio::test]
592 async fn audit_logger_file() {
593 let dir = tempfile::tempdir().unwrap();
594 let path = dir.path().join("audit.log");
595 let config = AuditConfig {
596 enabled: true,
597 destination: crate::config::AuditDestination::File(path.clone()),
598 ..Default::default()
599 };
600 let logger = AuditLogger::from_config(&config, false).await.unwrap();
601 let entry = AuditEntry {
602 timestamp: "0".into(),
603 tool: "shell".into(),
604 command: "echo test".into(),
605 result: AuditResult::Success,
606 duration_ms: 1,
607 error_category: None,
608 error_domain: None,
609 error_phase: None,
610 claim_source: None,
611 mcp_server_id: None,
612 injection_flagged: false,
613 embedding_anomalous: false,
614 cross_boundary_mcp_to_acp: false,
615 adversarial_policy_decision: None,
616 exit_code: None,
617 truncated: false,
618 policy_match: None,
619 correlation_id: None,
620 caller_id: None,
621 vigil_risk: None,
622 execution_env: None,
623 resolved_cwd: None,
624 scope_at_definition: None,
625 scope_at_dispatch: None,
626 };
627 logger.log(&entry).await;
628
629 let content = tokio::fs::read_to_string(&path).await.unwrap();
630 assert!(content.contains("\"tool\":\"shell\""));
631 }
632
633 #[tokio::test]
634 async fn audit_logger_file_write_error_logged() {
635 let config = AuditConfig {
636 enabled: true,
637 destination: crate::config::AuditDestination::File("/nonexistent/dir/audit.log".into()),
638 ..Default::default()
639 };
640 let result = AuditLogger::from_config(&config, false).await;
641 assert!(result.is_err());
642 }
643
644 #[test]
645 fn claim_source_serde_roundtrip() {
646 use crate::executor::ClaimSource;
647 let cases = [
648 (ClaimSource::Shell, "\"shell\""),
649 (ClaimSource::FileSystem, "\"file_system\""),
650 (ClaimSource::WebScrape, "\"web_scrape\""),
651 (ClaimSource::Mcp, "\"mcp\""),
652 (ClaimSource::A2a, "\"a2a\""),
653 (ClaimSource::CodeSearch, "\"code_search\""),
654 (ClaimSource::Diagnostics, "\"diagnostics\""),
655 (ClaimSource::Memory, "\"memory\""),
656 ];
657 for (variant, expected_json) in cases {
658 let serialized = serde_json::to_string(&variant).unwrap();
659 assert_eq!(serialized, expected_json, "serialize {variant:?}");
660 let deserialized: ClaimSource = serde_json::from_str(&serialized).unwrap();
661 assert_eq!(deserialized, variant, "deserialize {variant:?}");
662 }
663 }
664
665 #[test]
666 fn audit_entry_claim_source_none_omitted() {
667 let entry = AuditEntry {
668 timestamp: "0".into(),
669 tool: "shell".into(),
670 command: "echo".into(),
671 result: AuditResult::Success,
672 duration_ms: 1,
673 error_category: None,
674 error_domain: None,
675 error_phase: None,
676 claim_source: None,
677 mcp_server_id: None,
678 injection_flagged: false,
679 embedding_anomalous: false,
680 cross_boundary_mcp_to_acp: false,
681 adversarial_policy_decision: None,
682 exit_code: None,
683 truncated: false,
684 policy_match: None,
685 correlation_id: None,
686 caller_id: None,
687 vigil_risk: None,
688 execution_env: None,
689 resolved_cwd: None,
690 scope_at_definition: None,
691 scope_at_dispatch: None,
692 };
693 let json = serde_json::to_string(&entry).unwrap();
694 assert!(
695 !json.contains("claim_source"),
696 "claim_source must be omitted when None: {json}"
697 );
698 }
699
700 #[test]
701 fn audit_entry_claim_source_some_present() {
702 use crate::executor::ClaimSource;
703 let entry = AuditEntry {
704 timestamp: "0".into(),
705 tool: "shell".into(),
706 command: "echo".into(),
707 result: AuditResult::Success,
708 duration_ms: 1,
709 error_category: None,
710 error_domain: None,
711 error_phase: None,
712 claim_source: Some(ClaimSource::Shell),
713 mcp_server_id: None,
714 injection_flagged: false,
715 embedding_anomalous: false,
716 cross_boundary_mcp_to_acp: false,
717 adversarial_policy_decision: None,
718 exit_code: None,
719 truncated: false,
720 policy_match: None,
721 correlation_id: None,
722 caller_id: None,
723 vigil_risk: None,
724 execution_env: None,
725 resolved_cwd: None,
726 scope_at_definition: None,
727 scope_at_dispatch: None,
728 };
729 let json = serde_json::to_string(&entry).unwrap();
730 assert!(
731 json.contains("\"claim_source\":\"shell\""),
732 "expected claim_source=shell in JSON: {json}"
733 );
734 }
735
736 #[tokio::test]
737 async fn audit_logger_multiple_entries() {
738 let dir = tempfile::tempdir().unwrap();
739 let path = dir.path().join("audit.log");
740 let config = AuditConfig {
741 enabled: true,
742 destination: crate::config::AuditDestination::File(path.clone()),
743 ..Default::default()
744 };
745 let logger = AuditLogger::from_config(&config, false).await.unwrap();
746
747 for i in 0..5 {
748 let entry = AuditEntry {
749 timestamp: i.to_string(),
750 tool: "shell".into(),
751 command: format!("cmd{i}"),
752 result: AuditResult::Success,
753 duration_ms: i,
754 error_category: None,
755 error_domain: None,
756 error_phase: None,
757 claim_source: None,
758 mcp_server_id: None,
759 injection_flagged: false,
760 embedding_anomalous: false,
761 cross_boundary_mcp_to_acp: false,
762 adversarial_policy_decision: None,
763 exit_code: None,
764 truncated: false,
765 policy_match: None,
766 correlation_id: None,
767 caller_id: None,
768 vigil_risk: None,
769 execution_env: None,
770 resolved_cwd: None,
771 scope_at_definition: None,
772 scope_at_dispatch: None,
773 };
774 logger.log(&entry).await;
775 }
776
777 let content = tokio::fs::read_to_string(&path).await.unwrap();
778 assert_eq!(content.lines().count(), 5);
779 }
780
781 #[test]
782 fn audit_entry_exit_code_serialized() {
783 let entry = AuditEntry {
784 timestamp: "0".into(),
785 tool: "shell".into(),
786 command: "echo hi".into(),
787 result: AuditResult::Success,
788 duration_ms: 5,
789 error_category: None,
790 error_domain: None,
791 error_phase: None,
792 claim_source: None,
793 mcp_server_id: None,
794 injection_flagged: false,
795 embedding_anomalous: false,
796 cross_boundary_mcp_to_acp: false,
797 adversarial_policy_decision: None,
798 exit_code: Some(0),
799 truncated: false,
800 policy_match: None,
801 correlation_id: None,
802 caller_id: None,
803 vigil_risk: None,
804 execution_env: None,
805 resolved_cwd: None,
806 scope_at_definition: None,
807 scope_at_dispatch: None,
808 };
809 let json = serde_json::to_string(&entry).unwrap();
810 assert!(
811 json.contains("\"exit_code\":0"),
812 "exit_code must be serialized: {json}"
813 );
814 }
815
816 #[test]
817 fn audit_entry_exit_code_none_omitted() {
818 let entry = AuditEntry {
819 timestamp: "0".into(),
820 tool: "file".into(),
821 command: "read /tmp/x".into(),
822 result: AuditResult::Success,
823 duration_ms: 1,
824 error_category: None,
825 error_domain: None,
826 error_phase: None,
827 claim_source: None,
828 mcp_server_id: None,
829 injection_flagged: false,
830 embedding_anomalous: false,
831 cross_boundary_mcp_to_acp: false,
832 adversarial_policy_decision: None,
833 exit_code: None,
834 truncated: false,
835 policy_match: None,
836 correlation_id: None,
837 caller_id: None,
838 vigil_risk: None,
839 execution_env: None,
840 resolved_cwd: None,
841 scope_at_definition: None,
842 scope_at_dispatch: None,
843 };
844 let json = serde_json::to_string(&entry).unwrap();
845 assert!(
846 !json.contains("exit_code"),
847 "exit_code None must be omitted: {json}"
848 );
849 }
850
851 #[test]
852 fn log_tool_risk_summary_does_not_panic() {
853 log_tool_risk_summary(&[
854 "shell",
855 "bash",
856 "exec",
857 "web_scrape",
858 "fetch",
859 "scrape_page",
860 "file_write",
861 "file_read",
862 "file_delete",
863 "memory_search",
864 "unknown_tool",
865 ]);
866 }
867
868 #[test]
869 fn log_tool_risk_summary_empty_input_does_not_panic() {
870 log_tool_risk_summary(&[]);
871 }
872}