Skip to main content

mcpr_core/event/
bus.rs

1//! Event bus — routes proxy events to registered sinks.
2//!
3//! The proxy hot path calls `emit()` (non-blocking channel send).
4//! A background tokio task reads events and fans out to all sinks.
5//!
6//! ```text
7//! Proxy hot path ──► EventBus::emit(ProxyEvent) ──► [StderrSink, SqliteSink, CloudSink]
8//! ```
9//!
10//! Build a bus via [`EventManager`](super::EventManager) — it collects
11//! sinks at startup and hands back an [`EventBusHandle`].
12
13use super::sink::EventSink;
14use super::types::ProxyEvent;
15use tokio::sync::mpsc;
16
17pub(super) const CHANNEL_CAPACITY: usize = 10_000;
18const BATCH_SIZE: usize = 256;
19const FLUSH_INTERVAL_MS: u64 = 5000;
20
21/// Routes proxy events to multiple sinks via a bounded async channel.
22///
23/// The proxy hot path calls `emit()` which does a non-blocking channel send.
24/// A background tokio task reads from the channel and fans out to all sinks.
25#[derive(Clone)]
26pub struct EventBus {
27    tx: mpsc::Sender<ProxyEvent>,
28}
29
30/// Handle returned when the bus starts — use it to drive graceful shutdown
31/// and to clone [`EventBus`] handles for emitters.
32pub struct EventBusHandle {
33    pub bus: EventBus,
34    task: tokio::task::JoinHandle<()>,
35    shutdown: mpsc::Sender<()>,
36}
37
38impl EventBusHandle {
39    /// Graceful shutdown: signals the background task and waits for it to drain.
40    pub async fn shutdown(self) {
41        let _ = self.shutdown.send(()).await;
42        let _ = self.task.await;
43    }
44}
45
46impl EventBus {
47    /// Emit a proxy event. Non-blocking — drops the event if the channel is
48    /// full (never blocks the proxy request path).
49    pub fn emit(&self, event: ProxyEvent) {
50        let _ = self.tx.try_send(event);
51    }
52}
53
54/// Spawn the background dispatch task and return a live [`EventBusHandle`].
55/// Callers should go through [`EventManager`](super::EventManager) rather
56/// than calling this directly.
57pub(super) fn spawn(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
58    let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
59    let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
60
61    let task = tokio::spawn(bus_task(rx, shutdown_rx, sinks));
62
63    EventBusHandle {
64        bus: EventBus { tx },
65        task,
66        shutdown: shutdown_tx,
67    }
68}
69
70async fn bus_task(
71    mut rx: mpsc::Receiver<ProxyEvent>,
72    mut shutdown_rx: mpsc::Receiver<()>,
73    sinks: Vec<Box<dyn EventSink>>,
74) {
75    let mut batch = Vec::with_capacity(BATCH_SIZE);
76    let mut flush_interval =
77        tokio::time::interval(tokio::time::Duration::from_millis(FLUSH_INTERVAL_MS));
78    flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
79
80    loop {
81        tokio::select! {
82            Some(event) = rx.recv() => {
83                batch.push(event);
84
85                // Drain more events if available (non-blocking)
86                while batch.len() < BATCH_SIZE {
87                    match rx.try_recv() {
88                        Ok(event) => batch.push(event),
89                        Err(_) => break,
90                    }
91                }
92
93                dispatch_batch(&sinks, &batch);
94                batch.clear();
95            }
96            _ = flush_interval.tick() => {
97                for sink in &sinks {
98                    sink.flush();
99                }
100            }
101            _ = shutdown_rx.recv() => {
102                // Drain remaining events
103                while let Ok(event) = rx.try_recv() {
104                    batch.push(event);
105                }
106                if !batch.is_empty() {
107                    dispatch_batch(&sinks, &batch);
108                    batch.clear();
109                }
110                for sink in &sinks {
111                    sink.flush();
112                }
113                return;
114            }
115        }
116    }
117}
118
119fn dispatch_batch(sinks: &[Box<dyn EventSink>], batch: &[ProxyEvent]) {
120    for sink in sinks {
121        if batch.len() == 1 {
122            sink.on_event(&batch[0]);
123        } else {
124            sink.on_batch(batch);
125        }
126    }
127}