1use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13
14#[derive(Clone, Debug, Serialize, Deserialize)]
16#[serde(tag = "type")]
17pub enum CortexEvent {
18 MapStarted { domain: String, timestamp: String },
21 SitemapDiscovered {
23 domain: String,
24 url_count: usize,
25 elapsed_ms: u64,
26 },
27 HeadScanProgress {
29 domain: String,
30 scanned: usize,
31 total: usize,
32 live: usize,
33 dead: usize,
34 },
35 StructuredDataExtracted {
37 domain: String,
38 pages_fetched: usize,
39 jsonld_found: usize,
40 opengraph_found: usize,
41 patterns_used: usize,
42 elapsed_ms: u64,
43 },
44 LayerComplete {
46 domain: String,
47 layer: u8,
48 layer_name: String,
49 nodes_added: usize,
50 features_filled: usize,
51 elapsed_ms: u64,
52 },
53 MapComplete {
55 domain: String,
56 node_count: usize,
57 edge_count: usize,
58 page_types: usize,
59 total_ms: u64,
60 browser_contexts_used: usize,
61 jsonld_coverage: f32,
62 },
63 MapFailed {
65 domain: String,
66 error: String,
67 elapsed_ms: u64,
68 },
69
70 ActionStarted {
73 domain: String,
74 node: usize,
75 action_type: String,
76 execution_path: String,
77 },
78 ActionComplete {
80 domain: String,
81 node: usize,
82 action_type: String,
83 success: bool,
84 execution_path: String,
85 elapsed_ms: u64,
86 },
87
88 AuthStarted { domain: String, method: String },
91 AuthComplete {
93 domain: String,
94 method: String,
95 success: bool,
96 },
97 AuthConsentRequired {
99 domain: String,
100 provider: String,
101 scopes: Vec<String>,
102 },
103
104 QueryExecuted {
107 domain: String,
108 query_type: String,
109 results_count: usize,
110 elapsed_us: u64,
111 },
112
113 RuntimeStarted {
116 version: String,
117 http_port: Option<u16>,
118 socket_path: String,
119 },
120 AgentConnected {
122 agent_type: String,
123 agent_name: Option<String>,
124 },
125 AgentDisconnected { agent_type: String },
127 CacheStatus {
129 maps_cached: usize,
130 total_nodes: usize,
131 memory_mb: f32,
132 },
133}
134
135pub struct EventBus {
140 sender: broadcast::Sender<CortexEvent>,
141}
142
143impl EventBus {
144 pub fn new(capacity: usize) -> Self {
146 let (sender, _) = broadcast::channel(capacity);
147 Self { sender }
148 }
149
150 pub fn emit(&self, event: CortexEvent) {
152 let _ = self.sender.send(event);
153 }
154
155 pub fn subscribe(&self) -> broadcast::Receiver<CortexEvent> {
157 self.sender.subscribe()
158 }
159}
160
161pub fn event_matches_domain(event: &CortexEvent, domain: &str) -> bool {
163 match event {
164 CortexEvent::MapStarted { domain: d, .. }
165 | CortexEvent::SitemapDiscovered { domain: d, .. }
166 | CortexEvent::HeadScanProgress { domain: d, .. }
167 | CortexEvent::StructuredDataExtracted { domain: d, .. }
168 | CortexEvent::LayerComplete { domain: d, .. }
169 | CortexEvent::MapComplete { domain: d, .. }
170 | CortexEvent::MapFailed { domain: d, .. }
171 | CortexEvent::ActionStarted { domain: d, .. }
172 | CortexEvent::ActionComplete { domain: d, .. }
173 | CortexEvent::AuthStarted { domain: d, .. }
174 | CortexEvent::AuthComplete { domain: d, .. }
175 | CortexEvent::AuthConsentRequired { domain: d, .. }
176 | CortexEvent::QueryExecuted { domain: d, .. } => d == domain,
177 CortexEvent::RuntimeStarted { .. }
179 | CortexEvent::AgentConnected { .. }
180 | CortexEvent::AgentDisconnected { .. }
181 | CortexEvent::CacheStatus { .. } => true,
182 }
183}
184
185pub fn now_timestamp() -> String {
187 let dur = std::time::SystemTime::now()
189 .duration_since(std::time::UNIX_EPOCH)
190 .unwrap_or_default();
191 let secs = dur.as_secs();
192 format!("{secs}")
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn test_event_serialization() {
202 let event = CortexEvent::MapStarted {
203 domain: "example.com".to_string(),
204 timestamp: "1708276800".to_string(),
205 };
206 let json = serde_json::to_string(&event).unwrap();
207 assert!(json.contains("MapStarted"));
208 assert!(json.contains("example.com"));
209
210 let parsed: CortexEvent = serde_json::from_str(&json).unwrap();
212 match parsed {
213 CortexEvent::MapStarted { domain, .. } => assert_eq!(domain, "example.com"),
214 _ => panic!("wrong variant"),
215 }
216 }
217
218 #[test]
219 fn test_map_complete_serialization() {
220 let event = CortexEvent::MapComplete {
221 domain: "amazon.com".to_string(),
222 node_count: 47832,
223 edge_count: 142891,
224 page_types: 7,
225 total_ms: 2900,
226 browser_contexts_used: 0,
227 jsonld_coverage: 0.89,
228 };
229 let json = serde_json::to_string(&event).unwrap();
230 assert!(json.contains("MapComplete"));
231 assert!(json.contains("47832"));
232 }
233
234 #[test]
235 fn test_event_bus_emit_no_subscribers() {
236 let bus = EventBus::new(16);
237 bus.emit(CortexEvent::RuntimeStarted {
239 version: "0.4.4".to_string(),
240 http_port: Some(7700),
241 socket_path: "/tmp/cortex.sock".to_string(),
242 });
243 }
244
245 #[test]
246 fn test_event_bus_subscribe_receive() {
247 let bus = EventBus::new(16);
248 let mut rx = bus.subscribe();
249
250 bus.emit(CortexEvent::MapStarted {
251 domain: "test.com".to_string(),
252 timestamp: "123".to_string(),
253 });
254
255 let event = rx.try_recv().unwrap();
256 match event {
257 CortexEvent::MapStarted { domain, .. } => assert_eq!(domain, "test.com"),
258 _ => panic!("wrong event"),
259 }
260 }
261
262 #[test]
263 fn test_event_matches_domain() {
264 let event = CortexEvent::MapStarted {
265 domain: "example.com".to_string(),
266 timestamp: "123".to_string(),
267 };
268 assert!(event_matches_domain(&event, "example.com"));
269 assert!(!event_matches_domain(&event, "other.com"));
270
271 let sys = CortexEvent::RuntimeStarted {
273 version: "0.4.4".to_string(),
274 http_port: None,
275 socket_path: "/tmp/cortex.sock".to_string(),
276 };
277 assert!(event_matches_domain(&sys, "anything"));
278 }
279}