1use crate::error::Result;
2use crate::event::bus::EventBus;
3use crate::event::{EnvoyEvent, EventSeverity, EventType};
4use crate::message::MessageType;
5
6pub const AUDIT_PROJECT: &str = "_envoy_audit";
7
8pub struct AuditStore {
11 event_bus: EventBus,
12}
13
14impl Default for AuditStore {
15 fn default() -> Self {
16 Self::new()
17 }
18}
19
20impl AuditStore {
21 pub fn new() -> Self {
22 Self {
23 event_bus: EventBus::new(),
24 }
25 }
26
27 pub fn log_message(
29 &self,
30 graph: &sqlitegraph::SqliteGraph,
31 from: &str,
32 to: &str,
33 msg_type: MessageType,
34 msg_id: &str,
35 task_id: Option<&str>,
36 ) -> Result<()> {
37 let mut data = serde_json::json!({
38 "agent_id": from,
39 "target": to,
40 "msg_type": msg_type.as_str(),
41 "msg_id": msg_id,
42 });
43 if let Some(tid) = task_id {
44 data["task_id"] = serde_json::json!(tid);
45 }
46 let _ = self.event_bus.ingest(
47 graph,
48 AUDIT_PROJECT.to_string(),
49 EventType::AuditLog,
50 EventSeverity::Info,
51 "message_sent".to_string(),
52 format!("Agent {} sent {:?} message to {}", from, msg_type, to),
53 data,
54 )?;
55 Ok(())
56 }
57
58 pub fn log_event_ingested(
60 &self,
61 graph: &sqlitegraph::SqliteGraph,
62 project: &str,
63 source: &str,
64 event_type: EventType,
65 ) -> Result<()> {
66 let _ = self.event_bus.ingest(
67 graph,
68 AUDIT_PROJECT.to_string(),
69 EventType::AuditLog,
70 EventSeverity::Info,
71 "event_ingested".to_string(),
72 format!(
73 "Event {} ingested from {} for project {}",
74 event_type.as_str(),
75 source,
76 project
77 ),
78 serde_json::json!({
79 "agent_id": source,
80 "target": project,
81 "event_type": event_type.as_str(),
82 }),
83 )?;
84 Ok(())
85 }
86
87 pub fn log_agent_registered(
89 &self,
90 graph: &sqlitegraph::SqliteGraph,
91 agent_id: &str,
92 name: &str,
93 kind: &str,
94 ) -> Result<()> {
95 let _ = self.event_bus.ingest(
96 graph,
97 AUDIT_PROJECT.to_string(),
98 EventType::AuditLog,
99 EventSeverity::Info,
100 "agent_registered".to_string(),
101 format!("Agent {} ({}) registered as {}", agent_id, name, kind),
102 serde_json::json!({
103 "agent_id": agent_id,
104 "name": name,
105 "kind": kind,
106 }),
107 )?;
108 Ok(())
109 }
110
111 pub fn log_agent_disconnected(
113 &self,
114 graph: &sqlitegraph::SqliteGraph,
115 agent_id: &str,
116 ) -> Result<()> {
117 let _ = self.event_bus.ingest(
118 graph,
119 AUDIT_PROJECT.to_string(),
120 EventType::AuditLog,
121 EventSeverity::Info,
122 "agent_disconnected".to_string(),
123 format!("Agent {} disconnected", agent_id),
124 serde_json::json!({
125 "agent_id": agent_id,
126 }),
127 )?;
128 Ok(())
129 }
130
131 pub fn log_circuit_opened(
133 &self,
134 graph: &sqlitegraph::SqliteGraph,
135 agent_id: &str,
136 failure_count: u32,
137 ) -> Result<()> {
138 let _ = self.event_bus.ingest(
139 graph,
140 AUDIT_PROJECT.to_string(),
141 EventType::AuditLog,
142 EventSeverity::Warning,
143 "circuit_opened".to_string(),
144 format!(
145 "Circuit breaker opened for agent {} after {} failures",
146 agent_id, failure_count
147 ),
148 serde_json::json!({
149 "agent_id": agent_id,
150 "failure_count": failure_count,
151 }),
152 )?;
153 Ok(())
154 }
155
156 pub fn log_circuit_closed(
158 &self,
159 graph: &sqlitegraph::SqliteGraph,
160 agent_id: &str,
161 ) -> Result<()> {
162 let _ = self.event_bus.ingest(
163 graph,
164 AUDIT_PROJECT.to_string(),
165 EventType::AuditLog,
166 EventSeverity::Info,
167 "circuit_closed".to_string(),
168 format!("Circuit breaker closed for agent {}", agent_id),
169 serde_json::json!({
170 "agent_id": agent_id,
171 }),
172 )?;
173 Ok(())
174 }
175
176 pub fn log_task_claimed(
178 &self,
179 graph: &sqlitegraph::SqliteGraph,
180 task_id: &str,
181 agent_id: &str,
182 ) -> Result<()> {
183 let _ = self.event_bus.ingest(
184 graph,
185 AUDIT_PROJECT.to_string(),
186 EventType::AuditLog,
187 EventSeverity::Info,
188 "task_claimed".to_string(),
189 format!("Task {} claimed by agent {}", task_id, agent_id),
190 serde_json::json!({
191 "task_id": task_id,
192 "agent_id": agent_id,
193 }),
194 )?;
195 Ok(())
196 }
197
198 pub fn query(
200 &self,
201 graph: &sqlitegraph::SqliteGraph,
202 agent_id: Option<&str>,
203 operation: Option<&str>,
204 task_id: Option<&str>,
205 since: Option<&str>,
206 limit: Option<i64>,
207 ) -> Result<Vec<EnvoyEvent>> {
208 let mut events = self.event_bus.query(graph, AUDIT_PROJECT, since, limit)?;
209
210 if let Some(agent_id) = agent_id {
211 events.retain(|e| {
212 e.data
213 .get("agent_id")
214 .and_then(|v| v.as_str())
215 .is_some_and(|id| id == agent_id)
216 });
217 }
218
219 if let Some(operation) = operation {
220 events.retain(|e| e.source == operation);
221 }
222
223 if let Some(task_id) = task_id {
224 events.retain(|e| {
225 e.data
226 .get("task_id")
227 .and_then(|v| v.as_str())
228 .is_some_and(|id| id == task_id)
229 });
230 }
231
232 Ok(events)
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use crate::engine::Engine;
240
241 #[test]
242 fn audit_log_message_roundtrips() {
243 let engine = Engine::open_in_memory().unwrap();
244 let graph = engine.graph();
245 let audit = AuditStore::new();
246
247 audit
248 .log_message(graph, "id1", "id2", MessageType::Direct, "msg-123", None)
249 .unwrap();
250
251 let records = audit
252 .query(graph, Some("id1"), None, None, None, None)
253 .unwrap();
254 assert_eq!(records.len(), 1);
255 assert_eq!(records[0].source, "message_sent");
256 assert_eq!(
257 records[0].data.get("agent_id").unwrap().as_str().unwrap(),
258 "id1"
259 );
260 }
261
262 #[test]
263 fn audit_filter_by_operation() {
264 let engine = Engine::open_in_memory().unwrap();
265 let graph = engine.graph();
266 let audit = AuditStore::new();
267
268 audit
269 .log_agent_registered(graph, "id1", "claude", "claude")
270 .unwrap();
271 audit.log_agent_disconnected(graph, "id1").unwrap();
272
273 let registered = audit
274 .query(
275 graph,
276 Some("id1"),
277 Some("agent_registered"),
278 None,
279 None,
280 None,
281 )
282 .unwrap();
283 assert_eq!(registered.len(), 1);
284
285 let all = audit
286 .query(graph, Some("id1"), None, None, None, None)
287 .unwrap();
288 assert_eq!(all.len(), 2);
289 }
290}