opendev_runtime/event_bus/
mod.rs1mod events;
10mod subscribers;
11mod utils;
12
13use std::collections::HashSet;
14use std::sync::Arc;
15
16use tokio::sync::broadcast;
17use tracing::debug;
18
19pub use self::events::{Event, EventTopic, RuntimeEvent, now_ms};
21pub use self::subscribers::{FilteredSubscriber, TopicSubscriber};
22pub use self::utils::{group_events_by_type, group_runtime_events_by_topic};
23
24const DEFAULT_CAPACITY: usize = 256;
26
27#[derive(Clone)]
33pub struct EventBus {
34 inner: Arc<EventBusInner>,
35}
36
37struct EventBusInner {
38 sender: broadcast::Sender<RuntimeEvent>,
39 _capacity: usize,
40}
41
42impl EventBus {
43 pub fn new() -> Self {
45 Self::with_capacity(DEFAULT_CAPACITY)
46 }
47
48 pub fn with_capacity(capacity: usize) -> Self {
50 let (sender, _) = broadcast::channel(capacity);
51 Self {
52 inner: Arc::new(EventBusInner {
53 sender,
54 _capacity: capacity,
55 }),
56 }
57 }
58
59 pub fn publish(&self, event: RuntimeEvent) {
61 let topic = event.topic();
62 match self.inner.sender.send(event) {
63 Ok(n) => debug!("Event {:?} sent to {} subscribers", topic, n),
64 Err(_) => debug!("Event {:?} published with no subscribers", topic),
65 }
66 }
67
68 pub fn emit(&self, event_type: &str, source: &str, data: serde_json::Value) {
70 let event = Event::new(event_type, source, data);
71 self.publish(event.into_runtime_event());
72 }
73
74 pub fn subscribe(&self) -> broadcast::Receiver<RuntimeEvent> {
76 self.inner.sender.subscribe()
77 }
78
79 pub fn subscribe_topics(&self, topics: HashSet<EventTopic>) -> TopicSubscriber {
84 TopicSubscriber::new(self.inner.sender.subscribe(), topics)
85 }
86
87 pub fn subscriber_count(&self) -> usize {
89 self.inner.sender.receiver_count()
90 }
91}
92
93impl Default for EventBus {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl std::fmt::Debug for EventBus {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct("EventBus")
102 .field("subscribers", &self.subscriber_count())
103 .finish()
104 }
105}
106
107#[cfg(test)]
112mod tests;