opendev_runtime/event_bus/subscribers.rs
1//! Subscriber types for the event bus.
2//!
3//! [`TopicSubscriber`] provides topic-based filtering, while
4//! [`FilteredSubscriber`] provides legacy string-based filtering.
5
6use std::collections::HashSet;
7
8use tokio::sync::broadcast;
9use tracing::debug;
10
11use super::{Event, EventBus, EventTopic, RuntimeEvent};
12
13// ---------------------------------------------------------------------------
14// TopicSubscriber -- topic-based filtering (#94)
15// ---------------------------------------------------------------------------
16
17/// A subscriber that only receives events matching its declared topics.
18pub struct TopicSubscriber {
19 receiver: broadcast::Receiver<RuntimeEvent>,
20 topics: HashSet<EventTopic>,
21}
22
23impl TopicSubscriber {
24 /// Create a new topic subscriber.
25 pub(super) fn new(
26 receiver: broadcast::Receiver<RuntimeEvent>,
27 topics: HashSet<EventTopic>,
28 ) -> Self {
29 Self { receiver, topics }
30 }
31
32 /// Receive the next event matching the subscriber's topics.
33 pub async fn recv(&mut self) -> Option<RuntimeEvent> {
34 loop {
35 match self.receiver.recv().await {
36 Ok(event) => {
37 if self.topics.contains(&event.topic()) {
38 return Some(event);
39 }
40 // Not interested -- skip.
41 }
42 Err(broadcast::error::RecvError::Lagged(n)) => {
43 debug!("TopicSubscriber lagged, missed {n} events");
44 }
45 Err(broadcast::error::RecvError::Closed) => return None,
46 }
47 }
48 }
49
50 /// Return the set of topics this subscriber is interested in.
51 pub fn topics(&self) -> &HashSet<EventTopic> {
52 &self.topics
53 }
54}
55
56// ---------------------------------------------------------------------------
57// FilteredSubscriber -- legacy string-based filtering (backward compat)
58// ---------------------------------------------------------------------------
59
60/// Filtered event subscriber -- only receives events matching a filter.
61///
62/// Works with the legacy `event_type` string inside `RuntimeEvent::Custom`.
63pub struct FilteredSubscriber {
64 receiver: broadcast::Receiver<RuntimeEvent>,
65 event_types: Option<Vec<String>>,
66}
67
68impl FilteredSubscriber {
69 /// Create a filtered subscriber.
70 pub fn new(bus: &EventBus, event_types: Option<Vec<String>>) -> Self {
71 Self {
72 receiver: bus.subscribe(),
73 event_types,
74 }
75 }
76
77 /// Receive the next matching event (returns a legacy `Event`).
78 pub async fn recv(&mut self) -> Option<Event> {
79 loop {
80 match self.receiver.recv().await {
81 Ok(runtime_event) => {
82 // Convert RuntimeEvent to legacy Event for compat.
83 let legacy = match &runtime_event {
84 RuntimeEvent::Custom {
85 event_type,
86 source,
87 data,
88 timestamp_ms,
89 } => Event {
90 event_type: event_type.clone(),
91 source: source.clone(),
92 data: data.clone(),
93 timestamp_ms: *timestamp_ms,
94 },
95 other => Event {
96 event_type: format!("{:?}", other.topic()),
97 source: String::new(),
98 data: serde_json::to_value(other).unwrap_or(serde_json::Value::Null),
99 timestamp_ms: other.timestamp_ms(),
100 },
101 };
102
103 if let Some(ref types) = self.event_types
104 && !types.iter().any(|t| t == &legacy.event_type)
105 {
106 continue;
107 }
108 return Some(legacy);
109 }
110 Err(broadcast::error::RecvError::Lagged(n)) => {
111 debug!("Subscriber lagged, missed {n} events");
112 continue;
113 }
114 Err(broadcast::error::RecvError::Closed) => return None,
115 }
116 }
117 }
118}