Skip to main content

mcpr_core/event/
mod.rs

1//! Proxy event pipeline — types, sink trait, bus, and manager.
2//!
3//! ## Layout
4//!
5//! - [`types`]: [`ProxyEvent`] enum and its per-variant payload structs.
6//! - [`sink`]: [`EventSink`] trait that consumers implement.
7//! - [`bus`]: [`EventBus`] (emit handle) and [`EventBusHandle`] (shutdown).
8//! - [`manager`]: [`EventManager`] — builder that registers sinks and starts
9//!   the bus.
10//!
11//! Typical wiring:
12//!
13//! ```rust,ignore
14//! use mcpr_core::event::{EventManager, EventSink};
15//!
16//! let mut manager = EventManager::new();
17//! manager.register(Box::new(my_sink));
18//! let handle = manager.start();
19//! let bus = handle.bus.clone();
20//! // … bus.emit(event) from the proxy hot path …
21//! handle.shutdown().await;
22//! ```
23
24pub mod bus;
25pub mod manager;
26pub mod sink;
27pub mod types;
28
29pub use bus::{EventBus, EventBusHandle};
30pub use manager::EventManager;
31pub use sink::{EventSink, NoopSink};
32pub use types::{
33    HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
34    SessionStartEvent,
35};
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use std::sync::{Arc, Mutex};
41
42    struct MemorySink {
43        events: Arc<Mutex<Vec<ProxyEvent>>>,
44        flush_count: Arc<Mutex<u32>>,
45    }
46
47    impl MemorySink {
48        fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
49            let events = Arc::new(Mutex::new(Vec::new()));
50            let flush_count = Arc::new(Mutex::new(0u32));
51            (
52                Self {
53                    events: events.clone(),
54                    flush_count: flush_count.clone(),
55                },
56                events,
57                flush_count,
58            )
59        }
60    }
61
62    impl EventSink for MemorySink {
63        fn on_event(&self, event: &ProxyEvent) {
64            self.events.lock().unwrap().push(event.clone());
65        }
66        fn flush(&self) {
67            *self.flush_count.lock().unwrap() += 1;
68        }
69        fn name(&self) -> &'static str {
70            "memory"
71        }
72    }
73
74    fn test_request(note: &str) -> ProxyEvent {
75        ProxyEvent::Request(Box::new(RequestEvent {
76            id: uuid::Uuid::new_v4().to_string(),
77            ts: chrono::Utc::now().timestamp_millis(),
78            proxy: "test".into(),
79            session_id: None,
80            method: "POST".into(),
81            path: "/mcp".into(),
82            mcp_method: Some("tools/call".into()),
83            tool: Some("search".into()),
84            status: 200,
85            latency_us: 42_000,
86            upstream_us: Some(40_000),
87            request_size: Some(100),
88            response_size: Some(200),
89            error_code: None,
90            error_msg: None,
91            client_name: None,
92            client_version: None,
93            note: note.into(),
94        }))
95    }
96
97    fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
98        let mut mgr = EventManager::new();
99        for s in sinks {
100            mgr.register(s);
101        }
102        mgr.start()
103    }
104
105    #[tokio::test]
106    async fn routes_to_single_sink() {
107        let (sink, events, _) = MemorySink::new();
108        let handle = start_with(vec![Box::new(sink)]);
109
110        handle.bus.emit(test_request("a"));
111        handle.bus.emit(test_request("b"));
112
113        handle.shutdown().await;
114
115        let events = events.lock().unwrap();
116        assert_eq!(events.len(), 2);
117    }
118
119    #[tokio::test]
120    async fn routes_to_multiple_sinks() {
121        let (sink1, events1, _) = MemorySink::new();
122        let (sink2, events2, _) = MemorySink::new();
123        let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
124
125        handle.bus.emit(test_request("a"));
126
127        handle.shutdown().await;
128
129        assert_eq!(events1.lock().unwrap().len(), 1);
130        assert_eq!(events2.lock().unwrap().len(), 1);
131    }
132
133    #[tokio::test]
134    async fn flushes_on_shutdown() {
135        let (sink, _, flush_count) = MemorySink::new();
136        let handle = start_with(vec![Box::new(sink)]);
137
138        handle.bus.emit(test_request("a"));
139        handle.shutdown().await;
140
141        assert_eq!(*flush_count.lock().unwrap(), 1);
142    }
143
144    #[tokio::test]
145    async fn drains_channel_on_shutdown() {
146        let (sink, events, _) = MemorySink::new();
147        let handle = start_with(vec![Box::new(sink)]);
148
149        for _ in 0..100 {
150            handle.bus.emit(test_request("a"));
151        }
152
153        handle.shutdown().await;
154        assert_eq!(events.lock().unwrap().len(), 100);
155    }
156
157    #[tokio::test]
158    async fn works_with_no_sinks() {
159        let handle = EventManager::new().start();
160        handle.bus.emit(test_request("a"));
161        handle.shutdown().await;
162    }
163}