Skip to main content

envoy/
audit.rs

1use crate::error::Result;
2use crate::event::bus::EventBus;
3use crate::event::{EnvoyEvent, EventSeverity, EventType};
4use crate::message::MessageType;
5
6pub const AUDIT_PROJECT: &str = "_envoy_audit";
7
8/// Lightweight audit logging that reuses the event bus.
9/// All audit records are stored as events in the reserved `_envoy_audit` project.
10pub struct AuditStore {
11    event_bus: EventBus,
12}
13
14impl Default for AuditStore {
15    fn default() -> Self {
16        Self::new()
17    }
18}
19
20impl AuditStore {
21    pub fn new() -> Self {
22        Self {
23            event_bus: EventBus::new(),
24        }
25    }
26
27    /// Log that a message was sent.
28    pub fn log_message(
29        &self,
30        graph: &sqlitegraph::SqliteGraph,
31        from: &str,
32        to: &str,
33        msg_type: MessageType,
34        msg_id: &str,
35        task_id: Option<&str>,
36    ) -> Result<()> {
37        let mut data = serde_json::json!({
38            "agent_id": from,
39            "target": to,
40            "msg_type": msg_type.as_str(),
41            "msg_id": msg_id,
42        });
43        if let Some(tid) = task_id {
44            data["task_id"] = serde_json::json!(tid);
45        }
46        let _ = self.event_bus.ingest(
47            graph,
48            AUDIT_PROJECT.to_string(),
49            EventType::AuditLog,
50            EventSeverity::Info,
51            "message_sent".to_string(),
52            format!("Agent {} sent {:?} message to {}", from, msg_type, to),
53            data,
54        )?;
55        Ok(())
56    }
57
58    /// Log that an event was ingested.
59    pub fn log_event_ingested(
60        &self,
61        graph: &sqlitegraph::SqliteGraph,
62        project: &str,
63        source: &str,
64        event_type: EventType,
65    ) -> Result<()> {
66        let _ = self.event_bus.ingest(
67            graph,
68            AUDIT_PROJECT.to_string(),
69            EventType::AuditLog,
70            EventSeverity::Info,
71            "event_ingested".to_string(),
72            format!(
73                "Event {} ingested from {} for project {}",
74                event_type.as_str(),
75                source,
76                project
77            ),
78            serde_json::json!({
79                "agent_id": source,
80                "target": project,
81                "event_type": event_type.as_str(),
82            }),
83        )?;
84        Ok(())
85    }
86
87    /// Log agent registration.
88    pub fn log_agent_registered(
89        &self,
90        graph: &sqlitegraph::SqliteGraph,
91        agent_id: &str,
92        name: &str,
93        kind: &str,
94    ) -> Result<()> {
95        let _ = self.event_bus.ingest(
96            graph,
97            AUDIT_PROJECT.to_string(),
98            EventType::AuditLog,
99            EventSeverity::Info,
100            "agent_registered".to_string(),
101            format!("Agent {} ({}) registered as {}", agent_id, name, kind),
102            serde_json::json!({
103                "agent_id": agent_id,
104                "name": name,
105                "kind": kind,
106            }),
107        )?;
108        Ok(())
109    }
110
111    /// Log agent disconnection.
112    pub fn log_agent_disconnected(
113        &self,
114        graph: &sqlitegraph::SqliteGraph,
115        agent_id: &str,
116    ) -> Result<()> {
117        let _ = self.event_bus.ingest(
118            graph,
119            AUDIT_PROJECT.to_string(),
120            EventType::AuditLog,
121            EventSeverity::Info,
122            "agent_disconnected".to_string(),
123            format!("Agent {} disconnected", agent_id),
124            serde_json::json!({
125                "agent_id": agent_id,
126            }),
127        )?;
128        Ok(())
129    }
130
131    /// Log circuit breaker opened.
132    pub fn log_circuit_opened(
133        &self,
134        graph: &sqlitegraph::SqliteGraph,
135        agent_id: &str,
136        failure_count: u32,
137    ) -> Result<()> {
138        let _ = self.event_bus.ingest(
139            graph,
140            AUDIT_PROJECT.to_string(),
141            EventType::AuditLog,
142            EventSeverity::Warning,
143            "circuit_opened".to_string(),
144            format!(
145                "Circuit breaker opened for agent {} after {} failures",
146                agent_id, failure_count
147            ),
148            serde_json::json!({
149                "agent_id": agent_id,
150                "failure_count": failure_count,
151            }),
152        )?;
153        Ok(())
154    }
155
156    /// Log circuit breaker closed.
157    pub fn log_circuit_closed(
158        &self,
159        graph: &sqlitegraph::SqliteGraph,
160        agent_id: &str,
161    ) -> Result<()> {
162        let _ = self.event_bus.ingest(
163            graph,
164            AUDIT_PROJECT.to_string(),
165            EventType::AuditLog,
166            EventSeverity::Info,
167            "circuit_closed".to_string(),
168            format!("Circuit breaker closed for agent {}", agent_id),
169            serde_json::json!({
170                "agent_id": agent_id,
171            }),
172        )?;
173        Ok(())
174    }
175
176    /// Log a task claim.
177    pub fn log_task_claimed(
178        &self,
179        graph: &sqlitegraph::SqliteGraph,
180        task_id: &str,
181        agent_id: &str,
182    ) -> Result<()> {
183        let _ = self.event_bus.ingest(
184            graph,
185            AUDIT_PROJECT.to_string(),
186            EventType::AuditLog,
187            EventSeverity::Info,
188            "task_claimed".to_string(),
189            format!("Task {} claimed by agent {}", task_id, agent_id),
190            serde_json::json!({
191                "task_id": task_id,
192                "agent_id": agent_id,
193            }),
194        )?;
195        Ok(())
196    }
197
198    /// Query audit records.
199    pub fn query(
200        &self,
201        graph: &sqlitegraph::SqliteGraph,
202        agent_id: Option<&str>,
203        operation: Option<&str>,
204        task_id: Option<&str>,
205        since: Option<&str>,
206        limit: Option<i64>,
207    ) -> Result<Vec<EnvoyEvent>> {
208        let mut events = self.event_bus.query(graph, AUDIT_PROJECT, since, limit)?;
209
210        if let Some(agent_id) = agent_id {
211            events.retain(|e| {
212                e.data
213                    .get("agent_id")
214                    .and_then(|v| v.as_str())
215                    .is_some_and(|id| id == agent_id)
216            });
217        }
218
219        if let Some(operation) = operation {
220            events.retain(|e| e.source == operation);
221        }
222
223        if let Some(task_id) = task_id {
224            events.retain(|e| {
225                e.data
226                    .get("task_id")
227                    .and_then(|v| v.as_str())
228                    .is_some_and(|id| id == task_id)
229            });
230        }
231
232        Ok(events)
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use crate::engine::Engine;
240
241    #[test]
242    fn audit_log_message_roundtrips() {
243        let engine = Engine::open_in_memory().unwrap();
244        let graph = engine.graph();
245        let audit = AuditStore::new();
246
247        audit
248            .log_message(graph, "id1", "id2", MessageType::Direct, "msg-123", None)
249            .unwrap();
250
251        let records = audit
252            .query(graph, Some("id1"), None, None, None, None)
253            .unwrap();
254        assert_eq!(records.len(), 1);
255        assert_eq!(records[0].source, "message_sent");
256        assert_eq!(
257            records[0].data.get("agent_id").unwrap().as_str().unwrap(),
258            "id1"
259        );
260    }
261
262    #[test]
263    fn audit_filter_by_operation() {
264        let engine = Engine::open_in_memory().unwrap();
265        let graph = engine.graph();
266        let audit = AuditStore::new();
267
268        audit
269            .log_agent_registered(graph, "id1", "claude", "claude")
270            .unwrap();
271        audit.log_agent_disconnected(graph, "id1").unwrap();
272
273        let registered = audit
274            .query(
275                graph,
276                Some("id1"),
277                Some("agent_registered"),
278                None,
279                None,
280                None,
281            )
282            .unwrap();
283        assert_eq!(registered.len(), 1);
284
285        let all = audit
286            .query(graph, Some("id1"), None, None, None, None)
287            .unwrap();
288        assert_eq!(all.len(), 2);
289    }
290}