Skip to main content

envoy/event/
bus.rs

1use std::collections::HashSet;
2
3use chrono::{DateTime, Utc};
4use sqlitegraph::GraphEntity;
5
6use crate::error::Result;
7use crate::event::{EnvoyEvent, EventSeverity, EventType, KIND_EVENT};
8
9pub const KIND_DELIVERY: &str = "EnvoyEventDelivery";
10
11/// Tracks per-agent event delivery so offline agents get precise replay.
12pub struct DeliveryTracker;
13
14impl Default for DeliveryTracker {
15    fn default() -> Self {
16        Self::new()
17    }
18}
19
20impl DeliveryTracker {
21    pub fn new() -> Self {
22        Self
23    }
24
25    /// Record that an event was delivered to an agent.
26    pub fn record_delivery(
27        &self,
28        graph: &sqlitegraph::SqliteGraph,
29        agent_id: &str,
30        event_id: &str,
31    ) -> Result<()> {
32        let name = format!("dlv-{}-{}", agent_id, event_id);
33        if graph
34            .find_entity_by_kind_and_name(KIND_DELIVERY, &name)?
35            .is_some()
36        {
37            return Ok(());
38        }
39        let now = chrono::Utc::now().to_rfc3339();
40        let entity = GraphEntity {
41            id: 0,
42            kind: KIND_DELIVERY.to_string(),
43            name,
44            file_path: None,
45            data: serde_json::json!({
46                "agent_id": agent_id,
47                "event_id": event_id,
48                "delivered_at": now,
49            }),
50        };
51        graph.insert_entity(&entity)?;
52        Ok(())
53    }
54
55    /// Get events not yet delivered to an agent for a project.
56    pub fn get_undelivered(
57        &self,
58        graph: &sqlitegraph::SqliteGraph,
59        agent_id: &str,
60        project: &str,
61        limit: Option<i64>,
62    ) -> Result<Vec<EnvoyEvent>> {
63        let events = graph.find_entities_by_kind(KIND_EVENT)?;
64        let deliveries = graph.find_entities_by_kind(KIND_DELIVERY)?;
65
66        let delivered_ids: HashSet<String> = deliveries
67            .iter()
68            .filter(|d| read_str(&d.data, "agent_id") == agent_id)
69            .map(|d| read_str(&d.data, "event_id"))
70            .collect();
71
72        let mut undelivered: Vec<EnvoyEvent> = events
73            .iter()
74            .filter(|e| read_str(&e.data, "project") == project)
75            .filter(|e| !delivered_ids.contains(&e.id.to_string()))
76            .filter_map(|e| entity_to_event(e).ok())
77            .collect();
78
79        undelivered.sort_by_key(|a| a.timestamp);
80        if let Some(limit) = limit {
81            undelivered.truncate(limit as usize);
82        }
83        Ok(undelivered)
84    }
85
86    /// Clean up delivery records older than 24h.
87    pub fn purge_deliveries(&self, graph: &sqlitegraph::SqliteGraph) -> Result<usize> {
88        let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
89        let deliveries = graph.find_entities_by_kind(KIND_DELIVERY)?;
90        let mut purged = 0usize;
91        for d in &deliveries {
92            let ts = read_str(&d.data, "delivered_at");
93            if let Ok(dt) = DateTime::parse_from_rfc3339(&ts) {
94                if dt.with_timezone(&Utc) < cutoff {
95                    match graph.delete_entity(d.id) {
96                        Ok(()) => purged += 1,
97                        Err(e) => eprintln!("warn: failed to purge delivery {}: {}", d.id, e),
98                    }
99                }
100            }
101        }
102        Ok(purged)
103    }
104}
105
106pub struct EventBus;
107
108impl Default for EventBus {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114impl EventBus {
115    pub fn new() -> Self {
116        Self
117    }
118
119    #[allow(clippy::too_many_arguments)]
120    pub fn ingest(
121        &self,
122        graph: &sqlitegraph::SqliteGraph,
123        project: String,
124        event_type: EventType,
125        severity: EventSeverity,
126        source: String,
127        message: String,
128        data: serde_json::Value,
129    ) -> Result<EnvoyEvent> {
130        let timestamp = chrono::Utc::now();
131        let name = format!("evt-{}", uuid::Uuid::new_v4());
132        let entity = GraphEntity {
133            id: 0,
134            kind: KIND_EVENT.to_string(),
135            name,
136            file_path: None,
137            data: serde_json::json!({
138                "project": project,
139                "event_type": event_type.as_str(),
140                "severity": severity.as_str(),
141                "source": source,
142                "message": message,
143                "data": data,
144                "timestamp": timestamp.to_rfc3339(),
145            }),
146        };
147        let id = graph.insert_entity(&entity)?;
148
149        Ok(EnvoyEvent {
150            id: id.to_string(),
151            project,
152            event_type,
153            severity,
154            source,
155            message,
156            data,
157            timestamp,
158        })
159    }
160
161    pub fn query(
162        &self,
163        graph: &sqlitegraph::SqliteGraph,
164        project: &str,
165        since: Option<&str>,
166        limit: Option<i64>,
167    ) -> Result<Vec<EnvoyEvent>> {
168        let since_dt: Option<DateTime<Utc>> = since.and_then(|s| {
169            DateTime::parse_from_rfc3339(s)
170                .ok()
171                .map(|dt| dt.with_timezone(&Utc))
172        });
173        let entities = graph.find_entities_by_kind(KIND_EVENT)?;
174        let mut events: Vec<EnvoyEvent> = entities
175            .iter()
176            .filter(|e| read_str(&e.data, "project") == project)
177            .filter(|e| since_dt.is_none_or(|since| parse_ts(&e.data).is_some_and(|ts| ts > since)))
178            .filter_map(|e| entity_to_event(e).ok())
179            .collect();
180        events.sort_by_key(|b| std::cmp::Reverse(b.timestamp));
181        if let Some(limit) = limit {
182            events.truncate(limit as usize);
183        }
184        Ok(events)
185    }
186
187    pub fn purge_old_events(&self, graph: &sqlitegraph::SqliteGraph) -> Result<usize> {
188        let cutoff = chrono::Utc::now() - chrono::Duration::hours(24);
189        let entities = graph.find_entities_by_kind(KIND_EVENT)?;
190        let mut purged = 0usize;
191        for e in &entities {
192            if parse_ts(&e.data).is_some_and(|ts| ts < cutoff) {
193                match graph.delete_entity(e.id) {
194                    Ok(()) => purged += 1,
195                    Err(err) => eprintln!("warn: failed to purge event {}: {}", e.id, err),
196                }
197            }
198        }
199        Ok(purged)
200    }
201}
202
203fn entity_to_event(entity: &sqlitegraph::GraphEntity) -> Result<EnvoyEvent> {
204    let ts_str = read_str(&entity.data, "timestamp");
205    let timestamp = DateTime::parse_from_rfc3339(&ts_str)
206        .map(|dt| dt.with_timezone(&Utc))
207        .unwrap_or_else(|_| Utc::now());
208    Ok(EnvoyEvent {
209        id: entity.id.to_string(),
210        project: read_str(&entity.data, "project"),
211        event_type: read_str(&entity.data, "event_type")
212            .parse()
213            .unwrap_or(EventType::HookResult),
214        severity: match read_str(&entity.data, "severity").as_str() {
215            "warning" => EventSeverity::Warning,
216            "blocking" => EventSeverity::Blocking,
217            _ => EventSeverity::Info,
218        },
219        source: read_str(&entity.data, "source"),
220        message: read_str(&entity.data, "message"),
221        data: entity
222            .data
223            .get("data")
224            .cloned()
225            .unwrap_or(serde_json::Value::Null),
226        timestamp,
227    })
228}
229
230fn read_str(data: &serde_json::Value, key: &str) -> String {
231    data.get(key)
232        .and_then(|v| v.as_str())
233        .unwrap_or("")
234        .to_string()
235}
236
237fn parse_ts(data: &serde_json::Value) -> Option<DateTime<Utc>> {
238    DateTime::parse_from_rfc3339(&read_str(data, "timestamp"))
239        .ok()
240        .map(|dt| dt.with_timezone(&Utc))
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::engine::Engine;
247
248    #[test]
249    fn delivery_tracker_records_and_queries_undelivered() {
250        let engine = Engine::open_in_memory().unwrap();
251        let graph = engine.graph();
252        let bus = EventBus::new();
253        let tracker = DeliveryTracker::new();
254
255        let evt = bus
256            .ingest(
257                graph,
258                "magellan".into(),
259                EventType::CiStatus,
260                EventSeverity::Info,
261                "ci".into(),
262                "test".into(),
263                serde_json::json!({}),
264            )
265            .unwrap();
266
267        // Before delivery: event should be undelivered
268        let undelivered = tracker
269            .get_undelivered(graph, "agent-1", "magellan", None)
270            .unwrap();
271        assert_eq!(undelivered.len(), 1);
272        assert_eq!(undelivered[0].id, evt.id);
273
274        // Record delivery
275        tracker.record_delivery(graph, "agent-1", &evt.id).unwrap();
276
277        // After delivery: event should not appear
278        let undelivered = tracker
279            .get_undelivered(graph, "agent-1", "magellan", None)
280            .unwrap();
281        assert!(undelivered.is_empty());
282
283        // Other agent still hasn't received it
284        let undelivered = tracker
285            .get_undelivered(graph, "agent-2", "magellan", None)
286            .unwrap();
287        assert_eq!(undelivered.len(), 1);
288    }
289
290    #[test]
291    fn delivery_tracker_respects_project_boundary() {
292        let engine = Engine::open_in_memory().unwrap();
293        let graph = engine.graph();
294        let bus = EventBus::new();
295        let tracker = DeliveryTracker::new();
296
297        bus.ingest(
298            graph,
299            "envoy".into(),
300            EventType::DocSync,
301            EventSeverity::Info,
302            "doc".into(),
303            "test".into(),
304            serde_json::json!({}),
305        )
306        .unwrap();
307
308        // Agent subscribed to magellan shouldn't see envoy events
309        let undelivered = tracker
310            .get_undelivered(graph, "agent-1", "magellan", None)
311            .unwrap();
312        assert!(undelivered.is_empty());
313    }
314
315    #[test]
316    fn ingest_and_query_events() {
317        let engine = Engine::open_in_memory().unwrap();
318        let graph = engine.graph();
319        let bus = EventBus::new();
320
321        bus.ingest(
322            graph,
323            "magellan".into(),
324            EventType::HookResult,
325            EventSeverity::Warning,
326            "hook:stub".into(),
327            "stub found".into(),
328            serde_json::json!({"hook": "stub-check"}),
329        )
330        .unwrap();
331        bus.ingest(
332            graph,
333            "magellan".into(),
334            EventType::CiStatus,
335            EventSeverity::Info,
336            "ci:github".into(),
337            "CI green".into(),
338            serde_json::json!({"run_id": "123"}),
339        )
340        .unwrap();
341
342        let results = bus.query(graph, "magellan", None, None).unwrap();
343        assert_eq!(results.len(), 2);
344    }
345
346    #[test]
347    fn filtered_by_project() {
348        let engine = Engine::open_in_memory().unwrap();
349        let graph = engine.graph();
350        let bus = EventBus::new();
351
352        bus.ingest(
353            graph,
354            "envoy".into(),
355            EventType::DocSync,
356            EventSeverity::Info,
357            "doc:wiki".into(),
358            "updated".into(),
359            serde_json::json!({}),
360        )
361        .unwrap();
362
363        assert!(bus.query(graph, "magellan", None, None).unwrap().is_empty());
364        assert_eq!(bus.query(graph, "envoy", None, None).unwrap().len(), 1);
365    }
366
367    #[test]
368    fn purge_old_events() {
369        let engine = Engine::open_in_memory().unwrap();
370        let graph = engine.graph();
371        let bus = EventBus::new();
372
373        bus.ingest(
374            graph,
375            "magellan".into(),
376            EventType::DocSync,
377            EventSeverity::Info,
378            "test".into(),
379            "test".into(),
380            serde_json::json!({}),
381        )
382        .unwrap();
383        // New event should not be purged
384        let purged = bus.purge_old_events(graph).unwrap();
385        assert_eq!(purged, 0);
386    }
387}