veilid_tools/
event_bus.rs

1//! Event Bus
2
3use super::*;
4use futures_util::stream::{FuturesUnordered, StreamExt};
5use stop_token::future::FutureExt as _;
6
7use std::any::{Any, TypeId};
8
9type AnyEventHandler =
10    Arc<dyn Fn(Arc<dyn Any + Send + Sync>) -> SendPinBoxFuture<()> + Send + Sync>;
11type SubscriptionId = u64;
12
13#[derive(Debug)]
14pub struct EventBusSubscription {
15    id: SubscriptionId,
16    type_id: TypeId,
17}
18
19struct QueuedEvent {
20    evt: Arc<dyn Any + Send + Sync>,
21    type_name: &'static str,
22}
23
24struct EventBusUnlockedInner {
25    tx: flume::Sender<QueuedEvent>,
26    rx: flume::Receiver<QueuedEvent>,
27    startup_lock: StartupLock,
28}
29
30impl fmt::Debug for EventBusUnlockedInner {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_struct("EventBusUnlockedInner")
33            .field("tx", &self.tx)
34            .field("rx", &self.rx)
35            .field("startup_lock", &self.startup_lock)
36            .finish()
37    }
38}
39
40struct EventBusInner {
41    handlers: HashMap<TypeId, Vec<(SubscriptionId, AnyEventHandler)>>,
42    next_sub_id: SubscriptionId,
43    free_sub_ids: Vec<SubscriptionId>,
44    stop_source: Option<StopSource>,
45    bus_processor_jh: Option<MustJoinHandle<()>>,
46}
47
48impl fmt::Debug for EventBusInner {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        f.debug_struct("EventBusInner")
51            .field("handlers.len", &self.handlers.len())
52            .field("next_sub_id", &self.next_sub_id)
53            .field("free_sub_ids", &self.free_sub_ids)
54            .finish()
55    }
56}
57
58/// Event bus
59///
60/// Asynchronously handles events of arbitrary Any type
61/// by passing them in-order to a set of registered async 'handler' functions.
62/// Handlers are processes in an unordered fashion, but an event is fully handled by all handlers
63/// until the next event in the posted event stream is processed.
64#[derive(Debug, Clone)]
65pub struct EventBus {
66    unlocked_inner: Arc<EventBusUnlockedInner>,
67    inner: Arc<Mutex<EventBusInner>>,
68}
69
70impl Default for EventBus {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl EventBus {
77    ////////////////////////////////////////////////////////////////////
78    // Public interface
79
80    /// Create a new EventBus
81    pub fn new() -> Self {
82        let (tx, rx) = flume::unbounded();
83        Self {
84            unlocked_inner: Arc::new(EventBusUnlockedInner {
85                tx,
86                rx,
87                startup_lock: StartupLock::new(),
88            }),
89            inner: Arc::new(Mutex::new(Self::new_inner())),
90        }
91    }
92
93    /// Start up the EventBus background processor
94    pub async fn startup(&self) -> Result<(), StartupLockAlreadyStartedError> {
95        let guard = self.unlocked_inner.startup_lock.startup()?;
96        {
97            let mut inner = self.inner.lock();
98            let stop_source = StopSource::new();
99            let stop_token = stop_source.token();
100            inner.stop_source = Some(stop_source);
101
102            let bus_processor_jh = spawn(
103                "event bus processor",
104                self.clone().bus_processor(stop_token),
105            );
106            inner.bus_processor_jh = Some(bus_processor_jh);
107        }
108
109        guard.success();
110        Ok(())
111    }
112
113    /// Shut down EventBus background processing
114    /// This unregisters all handlers as well and discards any unprocessed events
115    pub async fn shutdown(&self) {
116        let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
117            return;
118        };
119
120        let opt_jh = {
121            let mut inner = self.inner.lock();
122            drop(inner.stop_source.take());
123            inner.bus_processor_jh.take()
124        };
125
126        if let Some(jh) = opt_jh {
127            jh.await;
128        }
129
130        *self.inner.lock() = Self::new_inner();
131
132        guard.success();
133    }
134
135    /// Post an event to be processed
136    pub fn post<E: Any + Send + Sync + 'static>(
137        &self,
138        evt: E,
139    ) -> Result<(), StartupLockNotStartedError> {
140        let _guard = self.unlocked_inner.startup_lock.enter()?;
141
142        if let Err(e) = self.unlocked_inner.tx.send(QueuedEvent {
143            evt: Arc::new(evt),
144            type_name: std::any::type_name::<E>(),
145        }) {
146            error!("{}", e);
147        }
148        Ok(())
149    }
150
151    /// Subscribe a handler to handle all posted events of a particular type
152    /// Returns an subscription object that can be used to cancel this specific subscription if desired
153    pub fn subscribe<
154        E: Any + Send + Sync + 'static,
155        F: Fn(Arc<E>) -> SendPinBoxFuture<()> + Send + Sync + 'static,
156    >(
157        &self,
158        handler: F,
159    ) -> EventBusSubscription {
160        let handler = Arc::new(handler);
161        let type_id = TypeId::of::<E>();
162        let mut inner = self.inner.lock();
163
164        let id = inner.free_sub_ids.pop().unwrap_or_else(|| {
165            let id = inner.next_sub_id;
166            inner.next_sub_id += 1;
167            id
168        });
169
170        inner.handlers.entry(type_id).or_default().push((
171            id,
172            Arc::new(move |any_evt| {
173                let handler = handler.clone();
174                Box::pin(async move {
175                    handler(any_evt.downcast::<E>().unwrap()).await;
176                })
177            }),
178        ));
179
180        EventBusSubscription { id, type_id }
181    }
182
183    /// Given a subscription object returned from `subscribe`, removes the
184    /// subscription for the EventBus. The handler will no longer be called.
185    pub fn unsubscribe(&self, sub: EventBusSubscription) {
186        let mut inner = self.inner.lock();
187
188        inner.handlers.entry(sub.type_id).and_modify(|e| {
189            let index = e.iter().position(|x| x.0 == sub.id).unwrap();
190            e.remove(index);
191        });
192
193        inner.free_sub_ids.push(sub.id);
194    }
195
196    /// Returns the number of unprocessed events remaining
197    pub fn len(&self) -> usize {
198        self.unlocked_inner.rx.len()
199    }
200
201    /// Checks if the bus has no events
202    pub fn is_empty(&self) -> bool {
203        self.unlocked_inner.rx.is_empty()
204    }
205
206    ////////////////////////////////////////////////////////////////////
207    // Internal implementation
208
209    fn new_inner() -> EventBusInner {
210        EventBusInner {
211            handlers: HashMap::new(),
212            next_sub_id: 0,
213            free_sub_ids: vec![],
214            stop_source: None,
215            bus_processor_jh: None,
216        }
217    }
218
219    async fn bus_processor(self, stop_token: StopToken) {
220        while let Ok(Ok(qe)) = self
221            .unlocked_inner
222            .rx
223            .recv_async()
224            .timeout_at(stop_token.clone())
225            .await
226        {
227            let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
228                break;
229            };
230            let type_id = (qe.evt.as_ref()).type_id();
231            let type_name = qe.type_name;
232
233            let opt_handlers: Option<FuturesUnordered<_>> = {
234                let mut inner = self.inner.lock();
235                match inner.handlers.entry(type_id) {
236                    std::collections::hash_map::Entry::Occupied(entry) => Some(
237                        entry
238                            .get()
239                            .iter()
240                            .cloned()
241                            .map(|(_id, handler)| handler(qe.evt.clone()))
242                            .collect(),
243                    ),
244                    std::collections::hash_map::Entry::Vacant(_) => {
245                        error!("no handlers for event: {}", type_name);
246                        None
247                    }
248                }
249            };
250
251            // Process all handlers for this event simultaneously
252            if let Some(mut handlers) = opt_handlers {
253                while let Ok(Some(_)) = handlers.next().timeout_at(stop_token.clone()).await {}
254            }
255        }
256    }
257}