1pub mod bus;
25pub mod manager;
26pub mod sink;
27pub mod types;
28
29pub use bus::{EventBus, EventBusHandle};
30pub use manager::EventManager;
31pub use sink::{EventSink, NoopSink};
32pub use types::{
33 HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
34 SessionStartEvent, StageTimings,
35};
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40 use std::sync::{Arc, Mutex};
41
42 struct MemorySink {
43 events: Arc<Mutex<Vec<ProxyEvent>>>,
44 flush_count: Arc<Mutex<u32>>,
45 }
46
47 impl MemorySink {
48 fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
49 let events = Arc::new(Mutex::new(Vec::new()));
50 let flush_count = Arc::new(Mutex::new(0u32));
51 (
52 Self {
53 events: events.clone(),
54 flush_count: flush_count.clone(),
55 },
56 events,
57 flush_count,
58 )
59 }
60 }
61
62 impl EventSink for MemorySink {
63 fn on_event(&self, event: &ProxyEvent) {
64 self.events.lock().unwrap().push(event.clone());
65 }
66 fn flush(&self) {
67 *self.flush_count.lock().unwrap() += 1;
68 }
69 fn name(&self) -> &'static str {
70 "memory"
71 }
72 }
73
74 fn test_request(note: &str) -> ProxyEvent {
75 ProxyEvent::Request(Box::new(RequestEvent {
76 id: uuid::Uuid::new_v4().to_string(),
77 ts: chrono::Utc::now().timestamp_millis(),
78 proxy: "test".into(),
79 session_id: None,
80 method: "POST".into(),
81 path: "/mcp".into(),
82 mcp_method: Some("tools/call".into()),
83 tool: Some("search".into()),
84 status: 200,
85 latency_us: 42_000,
86 upstream_us: Some(40_000),
87 request_size: Some(100),
88 response_size: Some(200),
89 error_code: None,
90 error_msg: None,
91 client_name: None,
92 client_version: None,
93 note: note.into(),
94 stage_timings: None,
95 }))
96 }
97
98 fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
99 let mut mgr = EventManager::new();
100 for s in sinks {
101 mgr.register(s);
102 }
103 mgr.start()
104 }
105
106 #[tokio::test]
107 async fn routes_to_single_sink() {
108 let (sink, events, _) = MemorySink::new();
109 let handle = start_with(vec![Box::new(sink)]);
110
111 handle.bus.emit(test_request("a"));
112 handle.bus.emit(test_request("b"));
113
114 handle.shutdown().await;
115
116 let events = events.lock().unwrap();
117 assert_eq!(events.len(), 2);
118 }
119
120 #[tokio::test]
121 async fn routes_to_multiple_sinks() {
122 let (sink1, events1, _) = MemorySink::new();
123 let (sink2, events2, _) = MemorySink::new();
124 let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
125
126 handle.bus.emit(test_request("a"));
127
128 handle.shutdown().await;
129
130 assert_eq!(events1.lock().unwrap().len(), 1);
131 assert_eq!(events2.lock().unwrap().len(), 1);
132 }
133
134 #[tokio::test]
135 async fn flushes_on_shutdown() {
136 let (sink, _, flush_count) = MemorySink::new();
137 let handle = start_with(vec![Box::new(sink)]);
138
139 handle.bus.emit(test_request("a"));
140 handle.shutdown().await;
141
142 assert_eq!(*flush_count.lock().unwrap(), 1);
143 }
144
145 #[tokio::test]
146 async fn drains_channel_on_shutdown() {
147 let (sink, events, _) = MemorySink::new();
148 let handle = start_with(vec![Box::new(sink)]);
149
150 for _ in 0..100 {
151 handle.bus.emit(test_request("a"));
152 }
153
154 handle.shutdown().await;
155 assert_eq!(events.lock().unwrap().len(), 100);
156 }
157
158 #[tokio::test]
159 async fn works_with_no_sinks() {
160 let handle = EventManager::new().start();
161 handle.bus.emit(test_request("a"));
162 handle.shutdown().await;
163 }
164}