Skip to main content

mcpr_core/event/
mod.rs

1//! Proxy event pipeline — types, sink trait, bus, and manager.
2//!
3//! ## Layout
4//!
5//! - [`types`]: [`ProxyEvent`] enum and its per-variant payload structs.
6//! - [`sink`]: [`EventSink`] trait that consumers implement.
7//! - [`bus`]: [`EventBus`] (emit handle) and [`EventBusHandle`] (shutdown).
8//! - [`manager`]: [`EventManager`] — builder that registers sinks and starts
9//!   the bus.
10//!
11//! Typical wiring:
12//!
13//! ```rust,ignore
14//! use mcpr_core::event::{EventManager, EventSink};
15//!
16//! let mut manager = EventManager::new();
17//! manager.register(Box::new(my_sink));
18//! let handle = manager.start();
19//! let bus = handle.bus.clone();
20//! // … bus.emit(event) from the proxy hot path …
21//! handle.shutdown().await;
22//! ```
23
24pub mod bus;
25pub mod manager;
26pub mod sink;
27pub mod types;
28
29pub use bus::{EventBus, EventBusHandle};
30pub use manager::EventManager;
31pub use sink::{EventSink, NoopSink};
32pub use types::{
33    HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
34    SessionStartEvent,
35};
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use std::sync::{Arc, Mutex};
41
42    struct MemorySink {
43        events: Arc<Mutex<Vec<ProxyEvent>>>,
44        flush_count: Arc<Mutex<u32>>,
45    }
46
47    impl MemorySink {
48        #[allow(clippy::type_complexity)]
49        fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
50            let events = Arc::new(Mutex::new(Vec::new()));
51            let flush_count = Arc::new(Mutex::new(0u32));
52            (
53                Self {
54                    events: events.clone(),
55                    flush_count: flush_count.clone(),
56                },
57                events,
58                flush_count,
59            )
60        }
61    }
62
63    impl EventSink for MemorySink {
64        fn on_event(&self, event: &ProxyEvent) {
65            self.events.lock().unwrap().push(event.clone());
66        }
67        fn flush(&self) {
68            *self.flush_count.lock().unwrap() += 1;
69        }
70        fn name(&self) -> &'static str {
71            "memory"
72        }
73    }
74
75    fn test_request(note: &str) -> ProxyEvent {
76        ProxyEvent::Request(Box::new(RequestEvent {
77            id: uuid::Uuid::new_v4().to_string(),
78            ts: chrono::Utc::now().timestamp_millis(),
79            proxy: "test".into(),
80            session_id: None,
81            method: "POST".into(),
82            path: "/mcp".into(),
83            mcp_method: Some("tools/call".into()),
84            tool: Some("search".into()),
85            resource_uri: None,
86            prompt_name: None,
87            status: 200,
88            latency_us: 42_000,
89            upstream_us: Some(40_000),
90            request_size: Some(100),
91            response_size: Some(200),
92            error_code: None,
93            error_msg: None,
94            client_name: None,
95            client_version: None,
96            note: note.into(),
97            stage_timings: None,
98        }))
99    }
100
101    fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
102        let mut mgr = EventManager::new();
103        for s in sinks {
104            mgr.register(s);
105        }
106        mgr.start()
107    }
108
109    #[tokio::test]
110    async fn routes_to_single_sink() {
111        let (sink, events, _) = MemorySink::new();
112        let handle = start_with(vec![Box::new(sink)]);
113
114        handle.bus.emit(test_request("a"));
115        handle.bus.emit(test_request("b"));
116
117        handle.shutdown().await;
118
119        let events = events.lock().unwrap();
120        assert_eq!(events.len(), 2);
121    }
122
123    #[tokio::test]
124    async fn routes_to_multiple_sinks() {
125        let (sink1, events1, _) = MemorySink::new();
126        let (sink2, events2, _) = MemorySink::new();
127        let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
128
129        handle.bus.emit(test_request("a"));
130
131        handle.shutdown().await;
132
133        assert_eq!(events1.lock().unwrap().len(), 1);
134        assert_eq!(events2.lock().unwrap().len(), 1);
135    }
136
137    #[tokio::test]
138    async fn flushes_on_shutdown() {
139        let (sink, _, flush_count) = MemorySink::new();
140        let handle = start_with(vec![Box::new(sink)]);
141
142        handle.bus.emit(test_request("a"));
143        handle.shutdown().await;
144
145        assert_eq!(*flush_count.lock().unwrap(), 1);
146    }
147
148    #[tokio::test]
149    async fn drains_channel_on_shutdown() {
150        let (sink, events, _) = MemorySink::new();
151        let handle = start_with(vec![Box::new(sink)]);
152
153        for _ in 0..100 {
154            handle.bus.emit(test_request("a"));
155        }
156
157        handle.shutdown().await;
158        assert_eq!(events.lock().unwrap().len(), 100);
159    }
160
161    #[tokio::test]
162    async fn works_with_no_sinks() {
163        let handle = EventManager::new().start();
164        handle.bus.emit(test_request("a"));
165        handle.shutdown().await;
166    }
167}