pub mod bus;
pub mod manager;
pub mod sink;
pub mod types;
pub use bus::{EventBus, EventBusHandle};
pub use manager::EventManager;
pub use sink::{EventSink, NoopSink};
pub use types::{
HeartbeatEvent, ProxyEvent, RequestEvent, SchemaVersionCreatedEvent, SessionEndEvent,
SessionStartEvent,
};
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
struct MemorySink {
events: Arc<Mutex<Vec<ProxyEvent>>>,
flush_count: Arc<Mutex<u32>>,
}
impl MemorySink {
#[allow(clippy::type_complexity)]
fn new() -> (Self, Arc<Mutex<Vec<ProxyEvent>>>, Arc<Mutex<u32>>) {
let events = Arc::new(Mutex::new(Vec::new()));
let flush_count = Arc::new(Mutex::new(0u32));
(
Self {
events: events.clone(),
flush_count: flush_count.clone(),
},
events,
flush_count,
)
}
}
impl EventSink for MemorySink {
fn on_event(&self, event: &ProxyEvent) {
self.events.lock().unwrap().push(event.clone());
}
fn flush(&self) {
*self.flush_count.lock().unwrap() += 1;
}
fn name(&self) -> &'static str {
"memory"
}
}
fn test_request(note: &str) -> ProxyEvent {
ProxyEvent::Request(Box::new(RequestEvent {
id: uuid::Uuid::new_v4().to_string(),
ts: chrono::Utc::now().timestamp_millis(),
proxy: "test".into(),
session_id: None,
method: "POST".into(),
path: "/mcp".into(),
mcp_method: Some("tools/call".into()),
tool: Some("search".into()),
resource_uri: None,
prompt_name: None,
status: 200,
latency_us: 42_000,
upstream_us: Some(40_000),
request_size: Some(100),
response_size: Some(200),
error_code: None,
error_msg: None,
client_name: None,
client_version: None,
note: note.into(),
stage_timings: None,
}))
}
fn start_with(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
let mut mgr = EventManager::new();
for s in sinks {
mgr.register(s);
}
mgr.start()
}
#[tokio::test]
async fn routes_to_single_sink() {
let (sink, events, _) = MemorySink::new();
let handle = start_with(vec![Box::new(sink)]);
handle.bus.emit(test_request("a"));
handle.bus.emit(test_request("b"));
handle.shutdown().await;
let events = events.lock().unwrap();
assert_eq!(events.len(), 2);
}
#[tokio::test]
async fn routes_to_multiple_sinks() {
let (sink1, events1, _) = MemorySink::new();
let (sink2, events2, _) = MemorySink::new();
let handle = start_with(vec![Box::new(sink1), Box::new(sink2)]);
handle.bus.emit(test_request("a"));
handle.shutdown().await;
assert_eq!(events1.lock().unwrap().len(), 1);
assert_eq!(events2.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn flushes_on_shutdown() {
let (sink, _, flush_count) = MemorySink::new();
let handle = start_with(vec![Box::new(sink)]);
handle.bus.emit(test_request("a"));
handle.shutdown().await;
assert_eq!(*flush_count.lock().unwrap(), 1);
}
#[tokio::test]
async fn drains_channel_on_shutdown() {
let (sink, events, _) = MemorySink::new();
let handle = start_with(vec![Box::new(sink)]);
for _ in 0..100 {
handle.bus.emit(test_request("a"));
}
handle.shutdown().await;
assert_eq!(events.lock().unwrap().len(), 100);
}
#[tokio::test]
async fn works_with_no_sinks() {
let handle = EventManager::new().start();
handle.bus.emit(test_request("a"));
handle.shutdown().await;
}
}