Skip to main content

mcpr_core/event/
mod.rs

1//! Proxy event pipeline — types, sink trait, bus, and manager.
2//!
3//! ## Layout
4//!
5//! - [`types`]: [`ProxyEvent`] enum and its per-variant payload structs.
6//! - [`sink`]: [`EventSink`] trait that consumers implement.
7//! - [`bus`]: [`EventBus`] (emit handle) and [`EventBusHandle`] (shutdown).
8//! - [`manager`]: [`EventManager`] — builder that registers sinks and starts
9//!   the bus.
10//!
11//! Typical wiring:
12//!
13//! ```rust,ignore
14//! use mcpr_core::event::{EventManager, EventSink};
15//!
16//! let mut manager = EventManager::new();
17//! manager.register(Box::new(my_sink));
18//! let handle = manager.start();
19//! let bus = handle.bus.clone();
20//! // … bus.emit(event) from the proxy hot path …
21//! handle.shutdown().await;
22//! ```
23
24pub mod bus;
25pub mod manager;
26pub mod sink;
27pub mod types;
28
29pub use bus::{EventBus, EventBusHandle};
30pub use manager::EventManager;
31pub use sink::{EventSink, NoopSink};
32pub use types::{
33    HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
34    SessionStartEvent, StageTimings,
35};
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use std::sync::{Arc, Mutex};
41
42    struct MemorySink {
43        events: Arc<Mutex<Vec<ProxyEvent>>>,
44        flush_count: Arc<Mutex<u32>>,
45    }
46
47    impl MemorySink {
48        fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
49            let events = Arc::new(Mutex::new(Vec::new()));
50            let flush_count = Arc::new(Mutex::new(0u32));
51            (
52                Self {
53                    events: events.clone(),
54                    flush_count: flush_count.clone(),
55                },
56                events,
57                flush_count,
58            )
59        }
60    }
61
62    impl EventSink for MemorySink {
63        fn on_event(&self, event: &ProxyEvent) {
64            self.events.lock().unwrap().push(event.clone());
65        }
66        fn flush(&self) {
67            *self.flush_count.lock().unwrap() += 1;
68        }
69        fn name(&self) -> &'static str {
70            "memory"
71        }
72    }
73
74    fn test_request(note: &str) -> ProxyEvent {
75        ProxyEvent::Request(Box::new(RequestEvent {
76            id: uuid::Uuid::new_v4().to_string(),
77            ts: chrono::Utc::now().timestamp_millis(),
78            proxy: "test".into(),
79            session_id: None,
80            method: "POST".into(),
81            path: "/mcp".into(),
82            mcp_method: Some("tools/call".into()),
83            tool: Some("search".into()),
84            status: 200,
85            latency_us: 42_000,
86            upstream_us: Some(40_000),
87            request_size: Some(100),
88            response_size: Some(200),
89            error_code: None,
90            error_msg: None,
91            client_name: None,
92            client_version: None,
93            note: note.into(),
94            stage_timings: None,
95        }))
96    }
97
98    fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
99        let mut mgr = EventManager::new();
100        for s in sinks {
101            mgr.register(s);
102        }
103        mgr.start()
104    }
105
106    #[tokio::test]
107    async fn routes_to_single_sink() {
108        let (sink, events, _) = MemorySink::new();
109        let handle = start_with(vec![Box::new(sink)]);
110
111        handle.bus.emit(test_request("a"));
112        handle.bus.emit(test_request("b"));
113
114        handle.shutdown().await;
115
116        let events = events.lock().unwrap();
117        assert_eq!(events.len(), 2);
118    }
119
120    #[tokio::test]
121    async fn routes_to_multiple_sinks() {
122        let (sink1, events1, _) = MemorySink::new();
123        let (sink2, events2, _) = MemorySink::new();
124        let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
125
126        handle.bus.emit(test_request("a"));
127
128        handle.shutdown().await;
129
130        assert_eq!(events1.lock().unwrap().len(), 1);
131        assert_eq!(events2.lock().unwrap().len(), 1);
132    }
133
134    #[tokio::test]
135    async fn flushes_on_shutdown() {
136        let (sink, _, flush_count) = MemorySink::new();
137        let handle = start_with(vec![Box::new(sink)]);
138
139        handle.bus.emit(test_request("a"));
140        handle.shutdown().await;
141
142        assert_eq!(*flush_count.lock().unwrap(), 1);
143    }
144
145    #[tokio::test]
146    async fn drains_channel_on_shutdown() {
147        let (sink, events, _) = MemorySink::new();
148        let handle = start_with(vec![Box::new(sink)]);
149
150        for _ in 0..100 {
151            handle.bus.emit(test_request("a"));
152        }
153
154        handle.shutdown().await;
155        assert_eq!(events.lock().unwrap().len(), 100);
156    }
157
158    #[tokio::test]
159    async fn works_with_no_sinks() {
160        let handle = EventManager::new().start();
161        handle.bus.emit(test_request("a"));
162        handle.shutdown().await;
163    }
164}