Skip to main content

cortex_runtime/
events.rs

1// Copyright 2026 Cortex Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4//! Cortex Event Bus — typed events from every component.
5//!
6//! The EventBus is a `tokio::sync::broadcast` channel that carries
7//! [`CortexEvent`] values. Any consumer — MCP server, REST SSE endpoint,
8//! web dashboard, log files — can subscribe independently. When no
9//! subscribers exist, events are silently dropped (zero overhead).
10
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13
14/// Every event Cortex emits. Serialized to JSON for SSE, MCP, and socket streaming.
15#[derive(Clone, Debug, Serialize, Deserialize)]
16#[serde(tag = "type")]
17pub enum CortexEvent {
18    // ── Mapping Events ────────────────────
19    /// A new mapping operation has started.
20    MapStarted { domain: String, timestamp: String },
21    /// Sitemap/robots.txt discovery found URLs.
22    SitemapDiscovered {
23        domain: String,
24        url_count: usize,
25        elapsed_ms: u64,
26    },
27    /// HEAD scan progress (emitted every ~500 URLs for large scans).
28    HeadScanProgress {
29        domain: String,
30        scanned: usize,
31        total: usize,
32        live: usize,
33        dead: usize,
34    },
35    /// Structured data extraction complete for a batch of pages.
36    StructuredDataExtracted {
37        domain: String,
38        pages_fetched: usize,
39        jsonld_found: usize,
40        opengraph_found: usize,
41        patterns_used: usize,
42        elapsed_ms: u64,
43    },
44    /// A mapping layer finished processing.
45    LayerComplete {
46        domain: String,
47        layer: u8,
48        layer_name: String,
49        nodes_added: usize,
50        features_filled: usize,
51        elapsed_ms: u64,
52    },
53    /// Mapping completed successfully.
54    MapComplete {
55        domain: String,
56        node_count: usize,
57        edge_count: usize,
58        page_types: usize,
59        total_ms: u64,
60        browser_contexts_used: usize,
61        jsonld_coverage: f32,
62    },
63    /// Mapping failed with an error.
64    MapFailed {
65        domain: String,
66        error: String,
67        elapsed_ms: u64,
68    },
69
70    // ── Action Events ─────────────────────
71    /// An action (add to cart, submit form, etc.) has started.
72    ActionStarted {
73        domain: String,
74        node: usize,
75        action_type: String,
76        execution_path: String,
77    },
78    /// An action completed.
79    ActionComplete {
80        domain: String,
81        node: usize,
82        action_type: String,
83        success: bool,
84        execution_path: String,
85        elapsed_ms: u64,
86    },
87
88    // ── Auth Events ───────────────────────
89    /// Authentication flow started.
90    AuthStarted { domain: String, method: String },
91    /// Authentication flow completed.
92    AuthComplete {
93        domain: String,
94        method: String,
95        success: bool,
96    },
97    /// User consent is required for OAuth/SSO.
98    AuthConsentRequired {
99        domain: String,
100        provider: String,
101        scopes: Vec<String>,
102    },
103
104    // ── Query Events ──────────────────────
105    /// A query/pathfind operation was executed.
106    QueryExecuted {
107        domain: String,
108        query_type: String,
109        results_count: usize,
110        elapsed_us: u64,
111    },
112
113    // ── System Events ─────────────────────
114    /// Cortex runtime started.
115    RuntimeStarted {
116        version: String,
117        http_port: Option<u16>,
118        socket_path: String,
119    },
120    /// An agent client connected.
121    AgentConnected {
122        agent_type: String,
123        agent_name: Option<String>,
124    },
125    /// An agent client disconnected.
126    AgentDisconnected { agent_type: String },
127    /// Periodic cache status update.
128    CacheStatus {
129        maps_cached: usize,
130        total_nodes: usize,
131        memory_mb: f32,
132    },
133}
134
135/// The central event bus for Cortex.
136///
137/// All components emit events through this bus. Consumers subscribe
138/// to receive a stream of all events.
139pub struct EventBus {
140    sender: broadcast::Sender<CortexEvent>,
141}
142
143impl EventBus {
144    /// Create a new event bus with the given buffer capacity.
145    pub fn new(capacity: usize) -> Self {
146        let (sender, _) = broadcast::channel(capacity);
147        Self { sender }
148    }
149
150    /// Emit an event to all subscribers. Silently ignores if no subscribers.
151    pub fn emit(&self, event: CortexEvent) {
152        let _ = self.sender.send(event);
153    }
154
155    /// Subscribe to receive all future events.
156    pub fn subscribe(&self) -> broadcast::Receiver<CortexEvent> {
157        self.sender.subscribe()
158    }
159}
160
161/// Check if an event is related to a specific domain.
162pub fn event_matches_domain(event: &CortexEvent, domain: &str) -> bool {
163    match event {
164        CortexEvent::MapStarted { domain: d, .. }
165        | CortexEvent::SitemapDiscovered { domain: d, .. }
166        | CortexEvent::HeadScanProgress { domain: d, .. }
167        | CortexEvent::StructuredDataExtracted { domain: d, .. }
168        | CortexEvent::LayerComplete { domain: d, .. }
169        | CortexEvent::MapComplete { domain: d, .. }
170        | CortexEvent::MapFailed { domain: d, .. }
171        | CortexEvent::ActionStarted { domain: d, .. }
172        | CortexEvent::ActionComplete { domain: d, .. }
173        | CortexEvent::AuthStarted { domain: d, .. }
174        | CortexEvent::AuthComplete { domain: d, .. }
175        | CortexEvent::AuthConsentRequired { domain: d, .. }
176        | CortexEvent::QueryExecuted { domain: d, .. } => d == domain,
177        // System events are not domain-specific — return true so they reach all subscribers
178        CortexEvent::RuntimeStarted { .. }
179        | CortexEvent::AgentConnected { .. }
180        | CortexEvent::AgentDisconnected { .. }
181        | CortexEvent::CacheStatus { .. } => true,
182    }
183}
184
185/// Get the ISO-8601 timestamp for the current time.
186pub fn now_timestamp() -> String {
187    // Use a simple approach without chrono dependency
188    let dur = std::time::SystemTime::now()
189        .duration_since(std::time::UNIX_EPOCH)
190        .unwrap_or_default();
191    let secs = dur.as_secs();
192    // Basic ISO format: seconds since epoch (consumers can format)
193    format!("{secs}")
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_event_serialization() {
202        let event = CortexEvent::MapStarted {
203            domain: "example.com".to_string(),
204            timestamp: "1708276800".to_string(),
205        };
206        let json = serde_json::to_string(&event).unwrap();
207        assert!(json.contains("MapStarted"));
208        assert!(json.contains("example.com"));
209
210        // Roundtrip
211        let parsed: CortexEvent = serde_json::from_str(&json).unwrap();
212        match parsed {
213            CortexEvent::MapStarted { domain, .. } => assert_eq!(domain, "example.com"),
214            _ => panic!("wrong variant"),
215        }
216    }
217
218    #[test]
219    fn test_map_complete_serialization() {
220        let event = CortexEvent::MapComplete {
221            domain: "amazon.com".to_string(),
222            node_count: 47832,
223            edge_count: 142891,
224            page_types: 7,
225            total_ms: 2900,
226            browser_contexts_used: 0,
227            jsonld_coverage: 0.89,
228        };
229        let json = serde_json::to_string(&event).unwrap();
230        assert!(json.contains("MapComplete"));
231        assert!(json.contains("47832"));
232    }
233
234    #[test]
235    fn test_event_bus_emit_no_subscribers() {
236        let bus = EventBus::new(16);
237        // Should not panic when no subscribers
238        bus.emit(CortexEvent::RuntimeStarted {
239            version: "0.4.4".to_string(),
240            http_port: Some(7700),
241            socket_path: "/tmp/cortex.sock".to_string(),
242        });
243    }
244
245    #[test]
246    fn test_event_bus_subscribe_receive() {
247        let bus = EventBus::new(16);
248        let mut rx = bus.subscribe();
249
250        bus.emit(CortexEvent::MapStarted {
251            domain: "test.com".to_string(),
252            timestamp: "123".to_string(),
253        });
254
255        let event = rx.try_recv().unwrap();
256        match event {
257            CortexEvent::MapStarted { domain, .. } => assert_eq!(domain, "test.com"),
258            _ => panic!("wrong event"),
259        }
260    }
261
262    #[test]
263    fn test_event_matches_domain() {
264        let event = CortexEvent::MapStarted {
265            domain: "example.com".to_string(),
266            timestamp: "123".to_string(),
267        };
268        assert!(event_matches_domain(&event, "example.com"));
269        assert!(!event_matches_domain(&event, "other.com"));
270
271        // System events always match
272        let sys = CortexEvent::RuntimeStarted {
273            version: "0.4.4".to_string(),
274            http_port: None,
275            socket_path: "/tmp/cortex.sock".to_string(),
276        };
277        assert!(event_matches_domain(&sys, "anything"));
278    }
279}