1pub mod bus;
25pub mod manager;
26pub mod sink;
27pub mod types;
28
29pub use bus::{EventBus, EventBusHandle};
30pub use manager::EventManager;
31pub use sink::{EventSink, NoopSink};
32pub use types::{
33 HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
34 SessionStartEvent,
35};
36
37#[cfg(test)]
38mod tests {
39 use super::*;
40 use std::sync::{Arc, Mutex};
41
42 struct MemorySink {
43 events: Arc<Mutex<Vec<ProxyEvent>>>,
44 flush_count: Arc<Mutex<u32>>,
45 }
46
47 impl MemorySink {
48 #[allow(clippy::type_complexity)]
49 fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
50 let events = Arc::new(Mutex::new(Vec::new()));
51 let flush_count = Arc::new(Mutex::new(0u32));
52 (
53 Self {
54 events: events.clone(),
55 flush_count: flush_count.clone(),
56 },
57 events,
58 flush_count,
59 )
60 }
61 }
62
63 impl EventSink for MemorySink {
64 fn on_event(&self, event: &ProxyEvent) {
65 self.events.lock().unwrap().push(event.clone());
66 }
67 fn flush(&self) {
68 *self.flush_count.lock().unwrap() += 1;
69 }
70 fn name(&self) -> &'static str {
71 "memory"
72 }
73 }
74
75 fn test_request(note: &str) -> ProxyEvent {
76 ProxyEvent::Request(Box::new(RequestEvent {
77 id: uuid::Uuid::new_v4().to_string(),
78 ts: chrono::Utc::now().timestamp_millis(),
79 proxy: "test".into(),
80 session_id: None,
81 method: "POST".into(),
82 path: "/mcp".into(),
83 mcp_method: Some("tools/call".into()),
84 tool: Some("search".into()),
85 resource_uri: None,
86 prompt_name: None,
87 status: 200,
88 latency_us: 42_000,
89 upstream_us: Some(40_000),
90 request_size: Some(100),
91 response_size: Some(200),
92 error_code: None,
93 error_msg: None,
94 client_name: None,
95 client_version: None,
96 note: note.into(),
97 stage_timings: None,
98 }))
99 }
100
101 fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
102 let mut mgr = EventManager::new();
103 for s in sinks {
104 mgr.register(s);
105 }
106 mgr.start()
107 }
108
109 #[tokio::test]
110 async fn routes_to_single_sink() {
111 let (sink, events, _) = MemorySink::new();
112 let handle = start_with(vec![Box::new(sink)]);
113
114 handle.bus.emit(test_request("a"));
115 handle.bus.emit(test_request("b"));
116
117 handle.shutdown().await;
118
119 let events = events.lock().unwrap();
120 assert_eq!(events.len(), 2);
121 }
122
123 #[tokio::test]
124 async fn routes_to_multiple_sinks() {
125 let (sink1, events1, _) = MemorySink::new();
126 let (sink2, events2, _) = MemorySink::new();
127 let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
128
129 handle.bus.emit(test_request("a"));
130
131 handle.shutdown().await;
132
133 assert_eq!(events1.lock().unwrap().len(), 1);
134 assert_eq!(events2.lock().unwrap().len(), 1);
135 }
136
137 #[tokio::test]
138 async fn flushes_on_shutdown() {
139 let (sink, _, flush_count) = MemorySink::new();
140 let handle = start_with(vec![Box::new(sink)]);
141
142 handle.bus.emit(test_request("a"));
143 handle.shutdown().await;
144
145 assert_eq!(*flush_count.lock().unwrap(), 1);
146 }
147
148 #[tokio::test]
149 async fn drains_channel_on_shutdown() {
150 let (sink, events, _) = MemorySink::new();
151 let handle = start_with(vec![Box::new(sink)]);
152
153 for _ in 0..100 {
154 handle.bus.emit(test_request("a"));
155 }
156
157 handle.shutdown().await;
158 assert_eq!(events.lock().unwrap().len(), 100);
159 }
160
161 #[tokio::test]
162 async fn works_with_no_sinks() {
163 let handle = EventManager::new().start();
164 handle.bus.emit(test_request("a"));
165 handle.shutdown().await;
166 }
167}