veilid_tools/
event_bus.rs1use super::*;
4use futures_util::stream::{FuturesUnordered, StreamExt};
5use stop_token::future::FutureExt as _;
6
7use std::any::{Any, TypeId};
8
9type AnyEventHandler =
10 Arc<dyn Fn(Arc<dyn Any + Send + Sync>) -> SendPinBoxFuture<()> + Send + Sync>;
11type SubscriptionId = u64;
12
13#[derive(Debug)]
14pub struct EventBusSubscription {
15 id: SubscriptionId,
16 type_id: TypeId,
17}
18
19struct QueuedEvent {
20 evt: Arc<dyn Any + Send + Sync>,
21 type_name: &'static str,
22}
23
24struct EventBusUnlockedInner {
25 tx: flume::Sender<QueuedEvent>,
26 rx: flume::Receiver<QueuedEvent>,
27 startup_lock: StartupLock,
28}
29
30impl fmt::Debug for EventBusUnlockedInner {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.debug_struct("EventBusUnlockedInner")
33 .field("tx", &self.tx)
34 .field("rx", &self.rx)
35 .field("startup_lock", &self.startup_lock)
36 .finish()
37 }
38}
39
40struct EventBusInner {
41 handlers: HashMap<TypeId, Vec<(SubscriptionId, AnyEventHandler)>>,
42 next_sub_id: SubscriptionId,
43 free_sub_ids: Vec<SubscriptionId>,
44 stop_source: Option<StopSource>,
45 bus_processor_jh: Option<MustJoinHandle<()>>,
46}
47
48impl fmt::Debug for EventBusInner {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 f.debug_struct("EventBusInner")
51 .field("handlers.len", &self.handlers.len())
52 .field("next_sub_id", &self.next_sub_id)
53 .field("free_sub_ids", &self.free_sub_ids)
54 .finish()
55 }
56}
57
58#[derive(Debug, Clone)]
65pub struct EventBus {
66 unlocked_inner: Arc<EventBusUnlockedInner>,
67 inner: Arc<Mutex<EventBusInner>>,
68}
69
70impl Default for EventBus {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl EventBus {
77 pub fn new() -> Self {
82 let (tx, rx) = flume::unbounded();
83 Self {
84 unlocked_inner: Arc::new(EventBusUnlockedInner {
85 tx,
86 rx,
87 startup_lock: StartupLock::new(),
88 }),
89 inner: Arc::new(Mutex::new(Self::new_inner())),
90 }
91 }
92
93 pub async fn startup(&self) -> Result<(), StartupLockAlreadyStartedError> {
95 let guard = self.unlocked_inner.startup_lock.startup()?;
96 {
97 let mut inner = self.inner.lock();
98 let stop_source = StopSource::new();
99 let stop_token = stop_source.token();
100 inner.stop_source = Some(stop_source);
101
102 let bus_processor_jh = spawn(
103 "event bus processor",
104 self.clone().bus_processor(stop_token),
105 );
106 inner.bus_processor_jh = Some(bus_processor_jh);
107 }
108
109 guard.success();
110 Ok(())
111 }
112
113 pub async fn shutdown(&self) {
116 let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
117 return;
118 };
119
120 let opt_jh = {
121 let mut inner = self.inner.lock();
122 drop(inner.stop_source.take());
123 inner.bus_processor_jh.take()
124 };
125
126 if let Some(jh) = opt_jh {
127 jh.await;
128 }
129
130 *self.inner.lock() = Self::new_inner();
131
132 guard.success();
133 }
134
135 pub fn post<E: Any + Send + Sync + 'static>(
137 &self,
138 evt: E,
139 ) -> Result<(), StartupLockNotStartedError> {
140 let _guard = self.unlocked_inner.startup_lock.enter()?;
141
142 if let Err(e) = self.unlocked_inner.tx.send(QueuedEvent {
143 evt: Arc::new(evt),
144 type_name: std::any::type_name::<E>(),
145 }) {
146 error!("{}", e);
147 }
148 Ok(())
149 }
150
151 pub fn subscribe<
154 E: Any + Send + Sync + 'static,
155 F: Fn(Arc<E>) -> SendPinBoxFuture<()> + Send + Sync + 'static,
156 >(
157 &self,
158 handler: F,
159 ) -> EventBusSubscription {
160 let handler = Arc::new(handler);
161 let type_id = TypeId::of::<E>();
162 let mut inner = self.inner.lock();
163
164 let id = inner.free_sub_ids.pop().unwrap_or_else(|| {
165 let id = inner.next_sub_id;
166 inner.next_sub_id += 1;
167 id
168 });
169
170 inner.handlers.entry(type_id).or_default().push((
171 id,
172 Arc::new(move |any_evt| {
173 let handler = handler.clone();
174 Box::pin(async move {
175 handler(any_evt.downcast::<E>().unwrap()).await;
176 })
177 }),
178 ));
179
180 EventBusSubscription { id, type_id }
181 }
182
183 pub fn unsubscribe(&self, sub: EventBusSubscription) {
186 let mut inner = self.inner.lock();
187
188 inner.handlers.entry(sub.type_id).and_modify(|e| {
189 let index = e.iter().position(|x| x.0 == sub.id).unwrap();
190 e.remove(index);
191 });
192
193 inner.free_sub_ids.push(sub.id);
194 }
195
196 pub fn len(&self) -> usize {
198 self.unlocked_inner.rx.len()
199 }
200
201 pub fn is_empty(&self) -> bool {
203 self.unlocked_inner.rx.is_empty()
204 }
205
206 fn new_inner() -> EventBusInner {
210 EventBusInner {
211 handlers: HashMap::new(),
212 next_sub_id: 0,
213 free_sub_ids: vec![],
214 stop_source: None,
215 bus_processor_jh: None,
216 }
217 }
218
219 async fn bus_processor(self, stop_token: StopToken) {
220 while let Ok(Ok(qe)) = self
221 .unlocked_inner
222 .rx
223 .recv_async()
224 .timeout_at(stop_token.clone())
225 .await
226 {
227 let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
228 break;
229 };
230 let type_id = (qe.evt.as_ref()).type_id();
231 let type_name = qe.type_name;
232
233 let opt_handlers: Option<FuturesUnordered<_>> = {
234 let mut inner = self.inner.lock();
235 match inner.handlers.entry(type_id) {
236 std::collections::hash_map::Entry::Occupied(entry) => Some(
237 entry
238 .get()
239 .iter()
240 .cloned()
241 .map(|(_id, handler)| handler(qe.evt.clone()))
242 .collect(),
243 ),
244 std::collections::hash_map::Entry::Vacant(_) => {
245 error!("no handlers for event: {}", type_name);
246 None
247 }
248 }
249 };
250
251 if let Some(mut handlers) = opt_handlers {
253 while let Ok(Some(_)) = handlers.next().timeout_at(stop_token.clone()).await {}
254 }
255 }
256 }
257}