1use super::sink::EventSink;
14use super::types::ProxyEvent;
15use tokio::sync::mpsc;
16
17pub(super) const CHANNEL_CAPACITY: usize = 10_000;
18const BATCH_SIZE: usize = 256;
19const FLUSH_INTERVAL_MS: u64 = 5000;
20
21#[derive(Clone)]
26pub struct EventBus {
27 tx: mpsc::Sender<ProxyEvent>,
28}
29
30pub struct EventBusHandle {
33 pub bus: EventBus,
34 task: tokio::task::JoinHandle<()>,
35 shutdown: mpsc::Sender<()>,
36}
37
38impl EventBusHandle {
39 pub async fn shutdown(self) {
41 let _ = self.shutdown.send(()).await;
42 let _ = self.task.await;
43 }
44}
45
46impl EventBus {
47 pub fn emit(&self, event: ProxyEvent) {
50 let _ = self.tx.try_send(event);
51 }
52}
53
54pub(super) fn spawn(sinks: Vec<Box<dyn EventSink>>) -> EventBusHandle {
58 let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
59 let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
60
61 let task = tokio::spawn(bus_task(rx, shutdown_rx, sinks));
62
63 EventBusHandle {
64 bus: EventBus { tx },
65 task,
66 shutdown: shutdown_tx,
67 }
68}
69
70async fn bus_task(
71 mut rx: mpsc::Receiver<ProxyEvent>,
72 mut shutdown_rx: mpsc::Receiver<()>,
73 sinks: Vec<Box<dyn EventSink>>,
74) {
75 let mut batch = Vec::with_capacity(BATCH_SIZE);
76 let mut flush_interval =
77 tokio::time::interval(tokio::time::Duration::from_millis(FLUSH_INTERVAL_MS));
78 flush_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
79
80 loop {
81 tokio::select! {
82 Some(event) = rx.recv() => {
83 batch.push(event);
84
85 while batch.len() < BATCH_SIZE {
87 match rx.try_recv() {
88 Ok(event) => batch.push(event),
89 Err(_) => break,
90 }
91 }
92
93 dispatch_batch(&sinks, &batch);
94 batch.clear();
95 }
96 _ = flush_interval.tick() => {
97 for sink in &sinks {
98 sink.flush();
99 }
100 }
101 _ = shutdown_rx.recv() => {
102 while let Ok(event) = rx.try_recv() {
104 batch.push(event);
105 }
106 if !batch.is_empty() {
107 dispatch_batch(&sinks, &batch);
108 batch.clear();
109 }
110 for sink in &sinks {
111 sink.flush();
112 }
113 return;
114 }
115 }
116 }
117}
118
119fn dispatch_batch(sinks: &[Box<dyn EventSink>], batch: &[ProxyEvent]) {
120 for sink in sinks {
121 if batch.len() == 1 {
122 sink.on_event(&batch[0]);
123 } else {
124 sink.on_batch(batch);
125 }
126 }
127}