Skip to main content

cortex_memory/observability/
mod.rs

1//! Observability — SSE event streaming for real-time graph change notifications.
2
3use cortex_core::hooks::{MutationAction, MutationHook};
4use cortex_core::{Edge, Node};
5use serde::Serialize;
6use tokio::sync::broadcast;
7
8/// A graph mutation event broadcast to SSE clients.
9#[derive(Debug, Clone, Serialize)]
10pub struct GraphEvent {
11    /// Event type: "node.created", "node.updated", "node.deleted",
12    /// "edge.created", "edge.updated", "edge.deleted"
13    pub event_type: String,
14    /// ISO-8601 timestamp
15    pub timestamp: String,
16    /// Event payload
17    pub data: serde_json::Value,
18}
19
20/// Broadcast channel type alias.
21pub type EventBus = broadcast::Sender<GraphEvent>;
22
23/// Creates a new event bus with the given capacity.
24pub fn new_event_bus(capacity: usize) -> EventBus {
25    let (tx, _rx) = broadcast::channel(capacity);
26    tx
27}
28
29/// A MutationHook that bridges core mutations to the server's EventBus broadcast channel.
30///
31/// Register this hook so ALL mutations (gRPC, auto-linker, library mode) emit SSE events.
32pub struct EventBusHook {
33    bus: EventBus,
34}
35
36impl EventBusHook {
37    pub fn new(bus: EventBus) -> Self {
38        Self { bus }
39    }
40
41    fn emit(&self, event: GraphEvent) {
42        // Ignore send errors — no receivers means no one is listening (that's fine)
43        let _ = self.bus.send(event);
44    }
45}
46
47impl MutationHook for EventBusHook {
48    fn on_node_mutation(&self, node: &Node, action: MutationAction) {
49        let event_type = match action {
50            MutationAction::Created => "node.created",
51            MutationAction::Updated => "node.updated",
52            MutationAction::Deleted => "node.deleted",
53        };
54
55        self.emit(GraphEvent {
56            event_type: event_type.to_string(),
57            timestamp: chrono::Utc::now().to_rfc3339(),
58            data: serde_json::json!({
59                "id": node.id.to_string(),
60                "kind": node.kind.as_str(),
61                "title": node.data.title,
62                "agent": node.source.agent,
63                "importance": node.importance,
64            }),
65        });
66    }
67
68    fn on_edge_mutation(&self, edge: &Edge, action: MutationAction) {
69        let event_type = match action {
70            MutationAction::Created => "edge.created",
71            MutationAction::Updated => "edge.updated",
72            MutationAction::Deleted => "edge.deleted",
73        };
74
75        self.emit(GraphEvent {
76            event_type: event_type.to_string(),
77            timestamp: chrono::Utc::now().to_rfc3339(),
78            data: serde_json::json!({
79                "id": edge.id.to_string(),
80                "from": edge.from.to_string(),
81                "to": edge.to.to_string(),
82                "relation": edge.relation.as_str(),
83                "weight": edge.weight,
84            }),
85        });
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use cortex_core::{EdgeProvenance, NodeKind, Relation, Source};
93
94    fn make_test_node() -> Node {
95        Node::new(
96            NodeKind::new("fact").unwrap(),
97            "Test SSE node".to_string(),
98            "Body content for SSE test".to_string(),
99            Source {
100                agent: "test-agent".to_string(),
101                session: None,
102                channel: None,
103            },
104            0.7,
105        )
106    }
107
108    fn make_test_edge() -> Edge {
109        Edge::new(
110            uuid::Uuid::now_v7(),
111            uuid::Uuid::now_v7(),
112            Relation::new("related_to").unwrap(),
113            0.85,
114            EdgeProvenance::Manual {
115                created_by: "test-agent".to_string(),
116            },
117        )
118    }
119
120    #[test]
121    fn test_event_bus_creation() {
122        let bus = new_event_bus(64);
123        // Should be able to subscribe
124        let _rx = bus.subscribe();
125        assert_eq!(bus.receiver_count(), 1);
126    }
127
128    #[test]
129    fn test_event_bus_hook_emits_node_events() {
130        let bus = new_event_bus(64);
131        let mut rx = bus.subscribe();
132        let hook = EventBusHook::new(bus);
133
134        let node = make_test_node();
135        hook.on_node_mutation(&node, MutationAction::Created);
136
137        let event = rx.try_recv().unwrap();
138        assert_eq!(event.event_type, "node.created");
139        assert_eq!(event.data["kind"], "fact");
140        assert_eq!(event.data["title"], "Test SSE node");
141        assert_eq!(event.data["agent"], "test-agent");
142    }
143
144    #[test]
145    fn test_event_bus_hook_emits_edge_events() {
146        let bus = new_event_bus(64);
147        let mut rx = bus.subscribe();
148        let hook = EventBusHook::new(bus);
149
150        let edge = make_test_edge();
151        hook.on_edge_mutation(&edge, MutationAction::Created);
152
153        let event = rx.try_recv().unwrap();
154        assert_eq!(event.event_type, "edge.created");
155        assert_eq!(event.data["relation"], "related_to");
156    }
157
158    #[test]
159    fn test_event_bus_hook_no_receivers_is_ok() {
160        let bus = new_event_bus(64);
161        // No subscribers — emit should not panic
162        let hook = EventBusHook::new(bus);
163        let node = make_test_node();
164        hook.on_node_mutation(&node, MutationAction::Created);
165    }
166
167    #[test]
168    fn test_event_bus_hook_all_node_actions() {
169        let bus = new_event_bus(64);
170        let mut rx = bus.subscribe();
171        let hook = EventBusHook::new(bus);
172        let node = make_test_node();
173
174        hook.on_node_mutation(&node, MutationAction::Created);
175        hook.on_node_mutation(&node, MutationAction::Updated);
176        hook.on_node_mutation(&node, MutationAction::Deleted);
177
178        assert_eq!(rx.try_recv().unwrap().event_type, "node.created");
179        assert_eq!(rx.try_recv().unwrap().event_type, "node.updated");
180        assert_eq!(rx.try_recv().unwrap().event_type, "node.deleted");
181    }
182
183    #[test]
184    fn test_event_bus_hook_all_edge_actions() {
185        let bus = new_event_bus(64);
186        let mut rx = bus.subscribe();
187        let hook = EventBusHook::new(bus);
188        let edge = make_test_edge();
189
190        hook.on_edge_mutation(&edge, MutationAction::Created);
191        hook.on_edge_mutation(&edge, MutationAction::Updated);
192        hook.on_edge_mutation(&edge, MutationAction::Deleted);
193
194        assert_eq!(rx.try_recv().unwrap().event_type, "edge.created");
195        assert_eq!(rx.try_recv().unwrap().event_type, "edge.updated");
196        assert_eq!(rx.try_recv().unwrap().event_type, "edge.deleted");
197    }
198
199    #[test]
200    fn test_graph_event_serialization() {
201        let event = GraphEvent {
202            event_type: "node.created".to_string(),
203            timestamp: "2026-01-01T00:00:00+00:00".to_string(),
204            data: serde_json::json!({"id": "abc", "kind": "fact"}),
205        };
206        let json = serde_json::to_string(&event).unwrap();
207        assert!(json.contains("node.created"));
208        assert!(json.contains("event_type"));
209        assert!(json.contains("timestamp"));
210    }
211}