Skip to main content

opendev_runtime/event_bus/
mod.rs

1//! Typed event bus for decoupled inter-component communication.
2//!
3//! Components publish typed [`RuntimeEvent`] variants; subscribers receive
4//! copies asynchronously. Supports topic-based filtering so each subscriber
5//! only receives events it is interested in.
6//!
7//! Events are broadcast via `tokio::sync::broadcast`.
8
9mod events;
10mod subscribers;
11mod utils;
12
13use std::collections::HashSet;
14use std::sync::Arc;
15
16use tokio::sync::broadcast;
17use tracing::debug;
18
19// Re-export public API so that `crate::event_bus::X` paths remain unchanged.
20pub use self::events::{Event, EventTopic, RuntimeEvent, now_ms};
21pub use self::subscribers::{FilteredSubscriber, TopicSubscriber};
22pub use self::utils::{group_events_by_type, group_runtime_events_by_topic};
23
24/// Maximum number of events buffered per channel.
25const DEFAULT_CAPACITY: usize = 256;
26
27// ---------------------------------------------------------------------------
28// EventBus -- typed publish / subscribe (#93 + #94)
29// ---------------------------------------------------------------------------
30
31/// Typed event bus for broadcasting [`RuntimeEvent`] instances.
32#[derive(Clone)]
33pub struct EventBus {
34    inner: Arc<EventBusInner>,
35}
36
37struct EventBusInner {
38    sender: broadcast::Sender<RuntimeEvent>,
39    _capacity: usize,
40}
41
42impl EventBus {
43    /// Create a new event bus with the default capacity.
44    pub fn new() -> Self {
45        Self::with_capacity(DEFAULT_CAPACITY)
46    }
47
48    /// Create a new event bus with a specific capacity.
49    pub fn with_capacity(capacity: usize) -> Self {
50        let (sender, _) = broadcast::channel(capacity);
51        Self {
52            inner: Arc::new(EventBusInner {
53                sender,
54                _capacity: capacity,
55            }),
56        }
57    }
58
59    /// Publish a typed event to all subscribers.
60    pub fn publish(&self, event: RuntimeEvent) {
61        let topic = event.topic();
62        match self.inner.sender.send(event) {
63            Ok(n) => debug!("Event {:?} sent to {} subscribers", topic, n),
64            Err(_) => debug!("Event {:?} published with no subscribers", topic),
65        }
66    }
67
68    /// Convenience: publish a legacy `Event` by converting it to `RuntimeEvent::Custom`.
69    pub fn emit(&self, event_type: &str, source: &str, data: serde_json::Value) {
70        let event = Event::new(event_type, source, data);
71        self.publish(event.into_runtime_event());
72    }
73
74    /// Subscribe to *all* events (unfiltered).
75    pub fn subscribe(&self) -> broadcast::Receiver<RuntimeEvent> {
76        self.inner.sender.subscribe()
77    }
78
79    /// Subscribe with topic-based filtering (#94).
80    ///
81    /// The returned [`TopicSubscriber`] only yields events whose topic is in
82    /// the given set.
83    pub fn subscribe_topics(&self, topics: HashSet<EventTopic>) -> TopicSubscriber {
84        TopicSubscriber::new(self.inner.sender.subscribe(), topics)
85    }
86
87    /// Number of active subscribers.
88    pub fn subscriber_count(&self) -> usize {
89        self.inner.sender.receiver_count()
90    }
91}
92
93impl Default for EventBus {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl std::fmt::Debug for EventBus {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.debug_struct("EventBus")
102            .field("subscribers", &self.subscriber_count())
103            .finish()
104    }
105}
106
107// ---------------------------------------------------------------------------
108// Tests
109// ---------------------------------------------------------------------------
110
111#[cfg(test)]
112mod tests;