Skip to main content

agentic_workflow/governance/
audit.rs

1use std::collections::HashMap;
2
3use chrono::Utc;
4use uuid::Uuid;
5
6use crate::types::{
7    AuditEvent, AuditEventType, AuditImpact, AuditOutcome, AuditQuery,
8    AuditRetention, WorkflowResult,
9};
10
11/// Structured, queryable audit trail engine.
12pub struct AuditEngine {
13    events: Vec<AuditEvent>,
14    retention: AuditRetention,
15}
16
17impl AuditEngine {
18    pub fn new() -> Self {
19        Self {
20            events: Vec::new(),
21            retention: AuditRetention {
22                retain_days: 90,
23                compliance_preset: None,
24                archive_after_days: Some(365),
25            },
26        }
27    }
28
29    /// Record an audit event.
30    pub fn record(
31        &mut self,
32        execution_id: &str,
33        workflow_id: &str,
34        step_id: Option<&str>,
35        event_type: AuditEventType,
36        actor: &str,
37        resource: Option<&str>,
38        input: Option<serde_json::Value>,
39        output: Option<serde_json::Value>,
40        outcome: AuditOutcome,
41    ) -> String {
42        let event_id = Uuid::new_v4().to_string();
43        let event = AuditEvent {
44            event_id: event_id.clone(),
45            execution_id: execution_id.to_string(),
46            workflow_id: workflow_id.to_string(),
47            step_id: step_id.map(|s| s.to_string()),
48            event_type,
49            actor: actor.to_string(),
50            timestamp: Utc::now(),
51            resource: resource.map(|s| s.to_string()),
52            input,
53            output,
54            outcome,
55            metadata: HashMap::new(),
56        };
57
58        self.events.push(event);
59        event_id
60    }
61
62    /// Query the audit trail.
63    pub fn query(&self, q: &AuditQuery) -> Vec<&AuditEvent> {
64        self.events
65            .iter()
66            .filter(|e| {
67                if let Some(wid) = &q.workflow_id {
68                    if &e.workflow_id != wid {
69                        return false;
70                    }
71                }
72                if let Some(eid) = &q.execution_id {
73                    if &e.execution_id != eid {
74                        return false;
75                    }
76                }
77                if let Some(actor) = &q.actor {
78                    if &e.actor != actor {
79                        return false;
80                    }
81                }
82                if let Some(resource) = &q.resource {
83                    if e.resource.as_deref() != Some(resource) {
84                        return false;
85                    }
86                }
87                if let Some(from) = &q.from {
88                    if e.timestamp < *from {
89                        return false;
90                    }
91                }
92                if let Some(to) = &q.to {
93                    if e.timestamp > *to {
94                        return false;
95                    }
96                }
97                true
98            })
99            .take(q.limit.unwrap_or(1000))
100            .collect()
101    }
102
103    /// Get chronological timeline.
104    pub fn timeline(
105        &self,
106        execution_id: Option<&str>,
107        limit: usize,
108    ) -> Vec<&AuditEvent> {
109        let mut events: Vec<&AuditEvent> = match execution_id {
110            Some(eid) => self
111                .events
112                .iter()
113                .filter(|e| e.execution_id == eid)
114                .collect(),
115            None => self.events.iter().collect(),
116        };
117
118        events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
119        events.truncate(limit);
120        events
121    }
122
123    /// Find all workflows that touched a resource.
124    pub fn impact_analysis(&self, resource: &str) -> AuditImpact {
125        let matching: Vec<&AuditEvent> = self
126            .events
127            .iter()
128            .filter(|e| e.resource.as_deref() == Some(resource))
129            .collect();
130
131        let workflow_ids: Vec<String> = matching
132            .iter()
133            .map(|e| e.workflow_id.clone())
134            .collect::<std::collections::HashSet<_>>()
135            .into_iter()
136            .collect();
137
138        let execution_ids: Vec<String> = matching
139            .iter()
140            .map(|e| e.execution_id.clone())
141            .collect::<std::collections::HashSet<_>>()
142            .into_iter()
143            .collect();
144
145        let first_touch = matching.iter().map(|e| e.timestamp).min().unwrap_or_else(Utc::now);
146        let last_touch = matching.iter().map(|e| e.timestamp).max().unwrap_or_else(Utc::now);
147
148        AuditImpact {
149            resource: resource.to_string(),
150            workflow_ids,
151            execution_ids,
152            event_count: matching.len(),
153            first_touch,
154            last_touch,
155        }
156    }
157
158    /// Export audit trail as JSON.
159    pub fn export(&self, query: &AuditQuery) -> WorkflowResult<String> {
160        let events = self.query(query);
161        serde_json::to_string_pretty(&events)
162            .map_err(|e| crate::types::WorkflowError::SerializationError(e.to_string()))
163    }
164
165    /// Set retention policy.
166    pub fn set_retention(&mut self, retention: AuditRetention) {
167        self.retention = retention;
168    }
169
170    /// Get retention policy.
171    pub fn get_retention(&self) -> &AuditRetention {
172        &self.retention
173    }
174
175    /// Total event count.
176    pub fn event_count(&self) -> usize {
177        self.events.len()
178    }
179}
180
181impl Default for AuditEngine {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn test_audit_record_and_query() {
193        let mut engine = AuditEngine::new();
194
195        engine.record(
196            "exec-1", "wf-1", Some("step-1"),
197            AuditEventType::StepExecuted,
198            "system", Some("billing-db"),
199            None, None, AuditOutcome::Success,
200        );
201
202        engine.record(
203            "exec-2", "wf-2", None,
204            AuditEventType::WorkflowStarted,
205            "user-a", None,
206            None, None, AuditOutcome::Success,
207        );
208
209        let q = AuditQuery {
210            workflow_id: Some("wf-1".to_string()),
211            execution_id: None,
212            event_types: None,
213            actor: None,
214            resource: None,
215            from: None,
216            to: None,
217            limit: None,
218        };
219
220        let results = engine.query(&q);
221        assert_eq!(results.len(), 1);
222
223        let impact = engine.impact_analysis("billing-db");
224        assert_eq!(impact.event_count, 1);
225    }
226}