cortex_memory/observability/
mod.rs1use cortex_core::hooks::{MutationAction, MutationHook};
4use cortex_core::{Edge, Node};
5use serde::Serialize;
6use tokio::sync::broadcast;
7
8#[derive(Debug, Clone, Serialize)]
10pub struct GraphEvent {
11 pub event_type: String,
14 pub timestamp: String,
16 pub data: serde_json::Value,
18}
19
20pub type EventBus = broadcast::Sender<GraphEvent>;
22
23pub fn new_event_bus(capacity: usize) -> EventBus {
25 let (tx, _rx) = broadcast::channel(capacity);
26 tx
27}
28
29pub struct EventBusHook {
33 bus: EventBus,
34}
35
36impl EventBusHook {
37 pub fn new(bus: EventBus) -> Self {
38 Self { bus }
39 }
40
41 fn emit(&self, event: GraphEvent) {
42 let _ = self.bus.send(event);
44 }
45}
46
47impl MutationHook for EventBusHook {
48 fn on_node_mutation(&self, node: &Node, action: MutationAction) {
49 let event_type = match action {
50 MutationAction::Created => "node.created",
51 MutationAction::Updated => "node.updated",
52 MutationAction::Deleted => "node.deleted",
53 };
54
55 self.emit(GraphEvent {
56 event_type: event_type.to_string(),
57 timestamp: chrono::Utc::now().to_rfc3339(),
58 data: serde_json::json!({
59 "id": node.id.to_string(),
60 "kind": node.kind.as_str(),
61 "title": node.data.title,
62 "agent": node.source.agent,
63 "importance": node.importance,
64 }),
65 });
66 }
67
68 fn on_edge_mutation(&self, edge: &Edge, action: MutationAction) {
69 let event_type = match action {
70 MutationAction::Created => "edge.created",
71 MutationAction::Updated => "edge.updated",
72 MutationAction::Deleted => "edge.deleted",
73 };
74
75 self.emit(GraphEvent {
76 event_type: event_type.to_string(),
77 timestamp: chrono::Utc::now().to_rfc3339(),
78 data: serde_json::json!({
79 "id": edge.id.to_string(),
80 "from": edge.from.to_string(),
81 "to": edge.to.to_string(),
82 "relation": edge.relation.as_str(),
83 "weight": edge.weight,
84 }),
85 });
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use cortex_core::{EdgeProvenance, NodeKind, Relation, Source};
93
94 fn make_test_node() -> Node {
95 Node::new(
96 NodeKind::new("fact").unwrap(),
97 "Test SSE node".to_string(),
98 "Body content for SSE test".to_string(),
99 Source {
100 agent: "test-agent".to_string(),
101 session: None,
102 channel: None,
103 },
104 0.7,
105 )
106 }
107
108 fn make_test_edge() -> Edge {
109 Edge::new(
110 uuid::Uuid::now_v7(),
111 uuid::Uuid::now_v7(),
112 Relation::new("related_to").unwrap(),
113 0.85,
114 EdgeProvenance::Manual {
115 created_by: "test-agent".to_string(),
116 },
117 )
118 }
119
120 #[test]
121 fn test_event_bus_creation() {
122 let bus = new_event_bus(64);
123 let _rx = bus.subscribe();
125 assert_eq!(bus.receiver_count(), 1);
126 }
127
128 #[test]
129 fn test_event_bus_hook_emits_node_events() {
130 let bus = new_event_bus(64);
131 let mut rx = bus.subscribe();
132 let hook = EventBusHook::new(bus);
133
134 let node = make_test_node();
135 hook.on_node_mutation(&node, MutationAction::Created);
136
137 let event = rx.try_recv().unwrap();
138 assert_eq!(event.event_type, "node.created");
139 assert_eq!(event.data["kind"], "fact");
140 assert_eq!(event.data["title"], "Test SSE node");
141 assert_eq!(event.data["agent"], "test-agent");
142 }
143
144 #[test]
145 fn test_event_bus_hook_emits_edge_events() {
146 let bus = new_event_bus(64);
147 let mut rx = bus.subscribe();
148 let hook = EventBusHook::new(bus);
149
150 let edge = make_test_edge();
151 hook.on_edge_mutation(&edge, MutationAction::Created);
152
153 let event = rx.try_recv().unwrap();
154 assert_eq!(event.event_type, "edge.created");
155 assert_eq!(event.data["relation"], "related_to");
156 }
157
158 #[test]
159 fn test_event_bus_hook_no_receivers_is_ok() {
160 let bus = new_event_bus(64);
161 let hook = EventBusHook::new(bus);
163 let node = make_test_node();
164 hook.on_node_mutation(&node, MutationAction::Created);
165 }
166
167 #[test]
168 fn test_event_bus_hook_all_node_actions() {
169 let bus = new_event_bus(64);
170 let mut rx = bus.subscribe();
171 let hook = EventBusHook::new(bus);
172 let node = make_test_node();
173
174 hook.on_node_mutation(&node, MutationAction::Created);
175 hook.on_node_mutation(&node, MutationAction::Updated);
176 hook.on_node_mutation(&node, MutationAction::Deleted);
177
178 assert_eq!(rx.try_recv().unwrap().event_type, "node.created");
179 assert_eq!(rx.try_recv().unwrap().event_type, "node.updated");
180 assert_eq!(rx.try_recv().unwrap().event_type, "node.deleted");
181 }
182
183 #[test]
184 fn test_event_bus_hook_all_edge_actions() {
185 let bus = new_event_bus(64);
186 let mut rx = bus.subscribe();
187 let hook = EventBusHook::new(bus);
188 let edge = make_test_edge();
189
190 hook.on_edge_mutation(&edge, MutationAction::Created);
191 hook.on_edge_mutation(&edge, MutationAction::Updated);
192 hook.on_edge_mutation(&edge, MutationAction::Deleted);
193
194 assert_eq!(rx.try_recv().unwrap().event_type, "edge.created");
195 assert_eq!(rx.try_recv().unwrap().event_type, "edge.updated");
196 assert_eq!(rx.try_recv().unwrap().event_type, "edge.deleted");
197 }
198
199 #[test]
200 fn test_graph_event_serialization() {
201 let event = GraphEvent {
202 event_type: "node.created".to_string(),
203 timestamp: "2026-01-01T00:00:00+00:00".to_string(),
204 data: serde_json::json!({"id": "abc", "kind": "fact"}),
205 };
206 let json = serde_json::to_string(&event).unwrap();
207 assert!(json.contains("node.created"));
208 assert!(json.contains("event_type"));
209 assert!(json.contains("timestamp"));
210 }
211}