Skip to main content

mcpr_core/event/
mod.rs

1//! Proxy event pipeline — types, sink trait, bus, and manager.
2//!
3//! ## Layout
4//!
5//! - [`types`]: [`ProxyEvent`] enum and its per-variant payload structs.
6//! - [`sink`]: [`EventSink`] trait that consumers implement.
7//! - [`bus`]: [`EventBus`] (emit handle) and [`EventBusHandle`] (shutdown).
8//! - [`manager`]: [`EventManager`] — builder that registers sinks and starts
9//!   the bus.
10//!
11//! Typical wiring:
12//!
13//! ```rust,ignore
14//! use mcpr_core::event::{EventManager, EventSink};
15//!
16//! let mut manager = EventManager::new();
17//! manager.register(Box::new(my_sink));
18//! let handle = manager.start();
19//! let bus = handle.bus.clone();
20//! // … bus.emit(event) from the proxy hot path …
21//! handle.shutdown().await;
22//! ```
23
24pub mod bus;
25pub mod manager;
26pub mod sink;
27pub mod types;
28
29pub use bus::{EventBus, EventBusHandle};
30pub use manager::EventManager;
31pub use sink::{EventSink, NoopSink};
32pub use types::{
33    HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
34    SessionStartEvent,
35};
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use std::sync::{Arc, Mutex};
41
42    struct MemorySink {
43        events: Arc<Mutex<Vec<ProxyEvent>>>,
44        flush_count: Arc<Mutex<u32>>,
45    }
46
47    impl MemorySink {
48        #[allow(clippy::type_complexity)]
49        fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
50            let events = Arc::new(Mutex::new(Vec::new()));
51            let flush_count = Arc::new(Mutex::new(0u32));
52            (
53                Self {
54                    events: events.clone(),
55                    flush_count: flush_count.clone(),
56                },
57                events,
58                flush_count,
59            )
60        }
61    }
62
63    impl EventSink for MemorySink {
64        fn on_event(&self, event: &ProxyEvent) {
65            self.events.lock().unwrap().push(event.clone());
66        }
67        fn flush(&self) {
68            *self.flush_count.lock().unwrap() += 1;
69        }
70        fn name(&self) -> &'static str {
71            "memory"
72        }
73    }
74
75    fn test_request(note: &str) -> ProxyEvent {
76        ProxyEvent::Request(Box::new(RequestEvent {
77            id: uuid::Uuid::new_v4().to_string(),
78            ts: chrono::Utc::now().timestamp_millis(),
79            proxy: "test".into(),
80            session_id: None,
81            method: "POST".into(),
82            path: "/mcp".into(),
83            mcp_method: Some("tools/call".into()),
84            tool: Some("search".into()),
85            status: 200,
86            latency_us: 42_000,
87            upstream_us: Some(40_000),
88            request_size: Some(100),
89            response_size: Some(200),
90            error_code: None,
91            error_msg: None,
92            client_name: None,
93            client_version: None,
94            note: note.into(),
95            stage_timings: None,
96        }))
97    }
98
99    fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
100        let mut mgr = EventManager::new();
101        for s in sinks {
102            mgr.register(s);
103        }
104        mgr.start()
105    }
106
107    #[tokio::test]
108    async fn routes_to_single_sink() {
109        let (sink, events, _) = MemorySink::new();
110        let handle = start_with(vec![Box::new(sink)]);
111
112        handle.bus.emit(test_request("a"));
113        handle.bus.emit(test_request("b"));
114
115        handle.shutdown().await;
116
117        let events = events.lock().unwrap();
118        assert_eq!(events.len(), 2);
119    }
120
121    #[tokio::test]
122    async fn routes_to_multiple_sinks() {
123        let (sink1, events1, _) = MemorySink::new();
124        let (sink2, events2, _) = MemorySink::new();
125        let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
126
127        handle.bus.emit(test_request("a"));
128
129        handle.shutdown().await;
130
131        assert_eq!(events1.lock().unwrap().len(), 1);
132        assert_eq!(events2.lock().unwrap().len(), 1);
133    }
134
135    #[tokio::test]
136    async fn flushes_on_shutdown() {
137        let (sink, _, flush_count) = MemorySink::new();
138        let handle = start_with(vec![Box::new(sink)]);
139
140        handle.bus.emit(test_request("a"));
141        handle.shutdown().await;
142
143        assert_eq!(*flush_count.lock().unwrap(), 1);
144    }
145
146    #[tokio::test]
147    async fn drains_channel_on_shutdown() {
148        let (sink, events, _) = MemorySink::new();
149        let handle = start_with(vec![Box::new(sink)]);
150
151        for _ in 0..100 {
152            handle.bus.emit(test_request("a"));
153        }
154
155        handle.shutdown().await;
156        assert_eq!(events.lock().unwrap().len(), 100);
157    }
158
159    #[tokio::test]
160    async fn works_with_no_sinks() {
161        let handle = EventManager::new().start();
162        handle.bus.emit(test_request("a"));
163        handle.shutdown().await;
164    }
165}