agentic_workflow/governance/
audit.rs1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7 AuditEvent, AuditEventType, AuditImpact, AuditOutcome, AuditQuery,
8 AuditRetention, WorkflowResult,
9};
10
11pub struct AuditEngine {
13 events: Vec<AuditEvent>,
14 retention: AuditRetention,
15}
16
17impl AuditEngine {
18 pub fn new() -> Self {
19 Self {
20 events: Vec::new(),
21 retention: AuditRetention {
22 retain_days: 90,
23 compliance_preset: None,
24 archive_after_days: Some(365),
25 },
26 }
27 }
28
29 pub fn record(
31 &mut self,
32 execution_id: &str,
33 workflow_id: &str,
34 step_id: Option<&str>,
35 event_type: AuditEventType,
36 actor: &str,
37 resource: Option<&str>,
38 input: Option<serde_json::Value>,
39 output: Option<serde_json::Value>,
40 outcome: AuditOutcome,
41 ) -> String {
42 let event_id = Uuid::new_v4().to_string();
43 let event = AuditEvent {
44 event_id: event_id.clone(),
45 execution_id: execution_id.to_string(),
46 workflow_id: workflow_id.to_string(),
47 step_id: step_id.map(|s| s.to_string()),
48 event_type,
49 actor: actor.to_string(),
50 timestamp: Utc::now(),
51 resource: resource.map(|s| s.to_string()),
52 input,
53 output,
54 outcome,
55 metadata: HashMap::new(),
56 };
57
58 self.events.push(event);
59 event_id
60 }
61
62 pub fn query(&self, q: &AuditQuery) -> Vec<&AuditEvent> {
64 self.events
65 .iter()
66 .filter(|e| {
67 if let Some(wid) = &q.workflow_id {
68 if &e.workflow_id != wid {
69 return false;
70 }
71 }
72 if let Some(eid) = &q.execution_id {
73 if &e.execution_id != eid {
74 return false;
75 }
76 }
77 if let Some(actor) = &q.actor {
78 if &e.actor != actor {
79 return false;
80 }
81 }
82 if let Some(resource) = &q.resource {
83 if e.resource.as_deref() != Some(resource) {
84 return false;
85 }
86 }
87 if let Some(from) = &q.from {
88 if e.timestamp < *from {
89 return false;
90 }
91 }
92 if let Some(to) = &q.to {
93 if e.timestamp > *to {
94 return false;
95 }
96 }
97 true
98 })
99 .take(q.limit.unwrap_or(1000))
100 .collect()
101 }
102
103 pub fn timeline(
105 &self,
106 execution_id: Option<&str>,
107 limit: usize,
108 ) -> Vec<&AuditEvent> {
109 let mut events: Vec<&AuditEvent> = match execution_id {
110 Some(eid) => self
111 .events
112 .iter()
113 .filter(|e| e.execution_id == eid)
114 .collect(),
115 None => self.events.iter().collect(),
116 };
117
118 events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
119 events.truncate(limit);
120 events
121 }
122
123 pub fn impact_analysis(&self, resource: &str) -> AuditImpact {
125 let matching: Vec<&AuditEvent> = self
126 .events
127 .iter()
128 .filter(|e| e.resource.as_deref() == Some(resource))
129 .collect();
130
131 let workflow_ids: Vec<String> = matching
132 .iter()
133 .map(|e| e.workflow_id.clone())
134 .collect::<std::collections::HashSet<_>>()
135 .into_iter()
136 .collect();
137
138 let execution_ids: Vec<String> = matching
139 .iter()
140 .map(|e| e.execution_id.clone())
141 .collect::<std::collections::HashSet<_>>()
142 .into_iter()
143 .collect();
144
145 let first_touch = matching.iter().map(|e| e.timestamp).min().unwrap_or_else(Utc::now);
146 let last_touch = matching.iter().map(|e| e.timestamp).max().unwrap_or_else(Utc::now);
147
148 AuditImpact {
149 resource: resource.to_string(),
150 workflow_ids,
151 execution_ids,
152 event_count: matching.len(),
153 first_touch,
154 last_touch,
155 }
156 }
157
158 pub fn export(&self, query: &AuditQuery) -> WorkflowResult<String> {
160 let events = self.query(query);
161 serde_json::to_string_pretty(&events)
162 .map_err(|e| crate::types::WorkflowError::SerializationError(e.to_string()))
163 }
164
165 pub fn set_retention(&mut self, retention: AuditRetention) {
167 self.retention = retention;
168 }
169
170 pub fn get_retention(&self) -> &AuditRetention {
172 &self.retention
173 }
174
175 pub fn event_count(&self) -> usize {
177 self.events.len()
178 }
179}
180
181impl Default for AuditEngine {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn test_audit_record_and_query() {
193 let mut engine = AuditEngine::new();
194
195 engine.record(
196 "exec-1", "wf-1", Some("step-1"),
197 AuditEventType::StepExecuted,
198 "system", Some("billing-db"),
199 None, None, AuditOutcome::Success,
200 );
201
202 engine.record(
203 "exec-2", "wf-2", None,
204 AuditEventType::WorkflowStarted,
205 "user-a", None,
206 None, None, AuditOutcome::Success,
207 );
208
209 let q = AuditQuery {
210 workflow_id: Some("wf-1".to_string()),
211 execution_id: None,
212 event_types: None,
213 actor: None,
214 resource: None,
215 from: None,
216 to: None,
217 limit: None,
218 };
219
220 let results = engine.query(&q);
221 assert_eq!(results.len(), 1);
222
223 let impact = engine.impact_analysis("billing-db");
224 assert_eq!(impact.event_count, 1);
225 }
226}