1use std::collections::HashSet;
2
3use chrono::{DateTime, Utc};
4use sqlitegraph::GraphEntity;
5
6use crate::error::Result;
7use crate::event::{EnvoyEvent, EventSeverity, EventType, KIND_EVENT};
8
9pub const KIND_DELIVERY: &str = "EnvoyEventDelivery";
10
11pub struct DeliveryTracker;
13
14impl Default for DeliveryTracker {
15 fn default() -> Self {
16 Self::new()
17 }
18}
19
20impl DeliveryTracker {
21 pub fn new() -> Self {
22 Self
23 }
24
25 pub fn record_delivery(
27 &self,
28 graph: &sqlitegraph::SqliteGraph,
29 agent_id: &str,
30 event_id: &str,
31 ) -> Result<()> {
32 let name = format!("dlv-{}-{}", agent_id, event_id);
33 if graph
34 .find_entity_by_kind_and_name(KIND_DELIVERY, &name)?
35 .is_some()
36 {
37 return Ok(());
38 }
39 let now = chrono::Utc::now().to_rfc3339();
40 let entity = GraphEntity {
41 id: 0,
42 kind: KIND_DELIVERY.to_string(),
43 name,
44 file_path: None,
45 data: serde_json::json!({
46 "agent_id": agent_id,
47 "event_id": event_id,
48 "delivered_at": now,
49 }),
50 };
51 graph.insert_entity(&entity)?;
52 Ok(())
53 }
54
55 pub fn get_undelivered(
57 &self,
58 graph: &sqlitegraph::SqliteGraph,
59 agent_id: &str,
60 project: &str,
61 limit: Option<i64>,
62 ) -> Result<Vec<EnvoyEvent>> {
63 let events = graph.find_entities_by_kind(KIND_EVENT)?;
64 let deliveries = graph.find_entities_by_kind(KIND_DELIVERY)?;
65
66 let delivered_ids: HashSet<String> = deliveries
67 .iter()
68 .filter(|d| read_str(&d.data, "agent_id") == agent_id)
69 .map(|d| read_str(&d.data, "event_id"))
70 .collect();
71
72 let mut undelivered: Vec<EnvoyEvent> = events
73 .iter()
74 .filter(|e| read_str(&e.data, "project") == project)
75 .filter(|e| !delivered_ids.contains(&e.id.to_string()))
76 .filter_map(|e| entity_to_event(e).ok())
77 .collect();
78
79 undelivered.sort_by_key(|a| a.timestamp);
80 if let Some(limit) = limit {
81 undelivered.truncate(limit as usize);
82 }
83 Ok(undelivered)
84 }
85
86 pub fn purge_deliveries(&self, graph: &sqlitegraph::SqliteGraph) -> Result<usize> {
88 let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
89 let deliveries = graph.find_entities_by_kind(KIND_DELIVERY)?;
90 let mut purged = 0usize;
91 for d in &deliveries {
92 let ts = read_str(&d.data, "delivered_at");
93 if let Ok(dt) = DateTime::parse_from_rfc3339(&ts) {
94 if dt.with_timezone(&Utc) < cutoff {
95 match graph.delete_entity(d.id) {
96 Ok(()) => purged += 1,
97 Err(e) => eprintln!("warn: failed to purge delivery {}: {}", d.id, e),
98 }
99 }
100 }
101 }
102 Ok(purged)
103 }
104}
105
106pub struct EventBus;
107
108impl Default for EventBus {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl EventBus {
115 pub fn new() -> Self {
116 Self
117 }
118
119 #[allow(clippy::too_many_arguments)]
120 pub fn ingest(
121 &self,
122 graph: &sqlitegraph::SqliteGraph,
123 project: String,
124 event_type: EventType,
125 severity: EventSeverity,
126 source: String,
127 message: String,
128 data: serde_json::Value,
129 ) -> Result<EnvoyEvent> {
130 let timestamp = chrono::Utc::now();
131 let name = format!("evt-{}", uuid::Uuid::new_v4());
132 let entity = GraphEntity {
133 id: 0,
134 kind: KIND_EVENT.to_string(),
135 name,
136 file_path: None,
137 data: serde_json::json!({
138 "project": project,
139 "event_type": event_type.as_str(),
140 "severity": severity.as_str(),
141 "source": source,
142 "message": message,
143 "data": data,
144 "timestamp": timestamp.to_rfc3339(),
145 }),
146 };
147 let id = graph.insert_entity(&entity)?;
148
149 Ok(EnvoyEvent {
150 id: id.to_string(),
151 project,
152 event_type,
153 severity,
154 source,
155 message,
156 data,
157 timestamp,
158 })
159 }
160
161 pub fn query(
162 &self,
163 graph: &sqlitegraph::SqliteGraph,
164 project: &str,
165 since: Option<&str>,
166 limit: Option<i64>,
167 ) -> Result<Vec<EnvoyEvent>> {
168 let since_dt: Option<DateTime<Utc>> = since.and_then(|s| {
169 DateTime::parse_from_rfc3339(s)
170 .ok()
171 .map(|dt| dt.with_timezone(&Utc))
172 });
173 let entities = graph.find_entities_by_kind(KIND_EVENT)?;
174 let mut events: Vec<EnvoyEvent> = entities
175 .iter()
176 .filter(|e| read_str(&e.data, "project") == project)
177 .filter(|e| since_dt.is_none_or(|since| parse_ts(&e.data).is_some_and(|ts| ts > since)))
178 .filter_map(|e| entity_to_event(e).ok())
179 .collect();
180 events.sort_by_key(|b| std::cmp::Reverse(b.timestamp));
181 if let Some(limit) = limit {
182 events.truncate(limit as usize);
183 }
184 Ok(events)
185 }
186
187 pub fn purge_old_events(&self, graph: &sqlitegraph::SqliteGraph) -> Result<usize> {
188 let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
189 let entities = graph.find_entities_by_kind(KIND_EVENT)?;
190 let mut purged = 0usize;
191 for e in &entities {
192 if parse_ts(&e.data).is_some_and(|ts| ts < cutoff) {
193 match graph.delete_entity(e.id) {
194 Ok(()) => purged += 1,
195 Err(err) => eprintln!("warn: failed to purge event {}: {}", e.id, err),
196 }
197 }
198 }
199 Ok(purged)
200 }
201}
202
203fn entity_to_event(entity: &sqlitegraph::GraphEntity) -> Result<EnvoyEvent> {
204 let ts_str = read_str(&entity.data, "timestamp");
205 let timestamp = DateTime::parse_from_rfc3339(&ts_str)
206 .map(|dt| dt.with_timezone(&Utc))
207 .unwrap_or_else(|_| Utc::now());
208 Ok(EnvoyEvent {
209 id: entity.id.to_string(),
210 project: read_str(&entity.data, "project"),
211 event_type: read_str(&entity.data, "event_type")
212 .parse()
213 .unwrap_or(EventType::HookResult),
214 severity: match read_str(&entity.data, "severity").as_str() {
215 "warning" => EventSeverity::Warning,
216 "blocking" => EventSeverity::Blocking,
217 _ => EventSeverity::Info,
218 },
219 source: read_str(&entity.data, "source"),
220 message: read_str(&entity.data, "message"),
221 data: entity
222 .data
223 .get("data")
224 .cloned()
225 .unwrap_or(serde_json::Value::Null),
226 timestamp,
227 })
228}
229
230fn read_str(data: &serde_json::Value, key: &str) -> String {
231 data.get(key)
232 .and_then(|v| v.as_str())
233 .unwrap_or("")
234 .to_string()
235}
236
237fn parse_ts(data: &serde_json::Value) -> Option<DateTime<Utc>> {
238 DateTime::parse_from_rfc3339(&read_str(data, "timestamp"))
239 .ok()
240 .map(|dt| dt.with_timezone(&Utc))
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::engine::Engine;
247
248 #[test]
249 fn delivery_tracker_records_and_queries_undelivered() {
250 let engine = Engine::open_in_memory().unwrap();
251 let graph = engine.graph();
252 let bus = EventBus::new();
253 let tracker = DeliveryTracker::new();
254
255 let evt = bus
256 .ingest(
257 graph,
258 "magellan".into(),
259 EventType::CiStatus,
260 EventSeverity::Info,
261 "ci".into(),
262 "test".into(),
263 serde_json::json!({}),
264 )
265 .unwrap();
266
267 let undelivered = tracker
269 .get_undelivered(graph, "agent-1", "magellan", None)
270 .unwrap();
271 assert_eq!(undelivered.len(), 1);
272 assert_eq!(undelivered[0].id, evt.id);
273
274 tracker.record_delivery(graph, "agent-1", &evt.id).unwrap();
276
277 let undelivered = tracker
279 .get_undelivered(graph, "agent-1", "magellan", None)
280 .unwrap();
281 assert!(undelivered.is_empty());
282
283 let undelivered = tracker
285 .get_undelivered(graph, "agent-2", "magellan", None)
286 .unwrap();
287 assert_eq!(undelivered.len(), 1);
288 }
289
290 #[test]
291 fn delivery_tracker_respects_project_boundary() {
292 let engine = Engine::open_in_memory().unwrap();
293 let graph = engine.graph();
294 let bus = EventBus::new();
295 let tracker = DeliveryTracker::new();
296
297 bus.ingest(
298 graph,
299 "envoy".into(),
300 EventType::DocSync,
301 EventSeverity::Info,
302 "doc".into(),
303 "test".into(),
304 serde_json::json!({}),
305 )
306 .unwrap();
307
308 let undelivered = tracker
310 .get_undelivered(graph, "agent-1", "magellan", None)
311 .unwrap();
312 assert!(undelivered.is_empty());
313 }
314
315 #[test]
316 fn ingest_and_query_events() {
317 let engine = Engine::open_in_memory().unwrap();
318 let graph = engine.graph();
319 let bus = EventBus::new();
320
321 bus.ingest(
322 graph,
323 "magellan".into(),
324 EventType::HookResult,
325 EventSeverity::Warning,
326 "hook:stub".into(),
327 "stub found".into(),
328 serde_json::json!({"hook": "stub-check"}),
329 )
330 .unwrap();
331 bus.ingest(
332 graph,
333 "magellan".into(),
334 EventType::CiStatus,
335 EventSeverity::Info,
336 "ci:github".into(),
337 "CI green".into(),
338 serde_json::json!({"run_id": "123"}),
339 )
340 .unwrap();
341
342 let results = bus.query(graph, "magellan", None, None).unwrap();
343 assert_eq!(results.len(), 2);
344 }
345
346 #[test]
347 fn filtered_by_project() {
348 let engine = Engine::open_in_memory().unwrap();
349 let graph = engine.graph();
350 let bus = EventBus::new();
351
352 bus.ingest(
353 graph,
354 "envoy".into(),
355 EventType::DocSync,
356 EventSeverity::Info,
357 "doc:wiki".into(),
358 "updated".into(),
359 serde_json::json!({}),
360 )
361 .unwrap();
362
363 assert!(bus.query(graph, "magellan", None, None).unwrap().is_empty());
364 assert_eq!(bus.query(graph, "envoy", None, None).unwrap().len(), 1);
365 }
366
367 #[test]
368 fn purge_old_events() {
369 let engine = Engine::open_in_memory().unwrap();
370 let graph = engine.graph();
371 let bus = EventBus::new();
372
373 bus.ingest(
374 graph,
375 "magellan".into(),
376 EventType::DocSync,
377 EventSeverity::Info,
378 "test".into(),
379 "test".into(),
380 serde_json::json!({}),
381 )
382 .unwrap();
383 let purged = bus.purge_old_events(graph).unwrap();
385 assert_eq!(purged, 0);
386 }
387}