Skip to main content

opendev_runtime/event_bus/
subscribers.rs

1//! Subscriber types for the event bus.
2//!
3//! [`TopicSubscriber`] provides topic-based filtering, while
4//! [`FilteredSubscriber`] provides legacy string-based filtering.
5
6use std::collections::HashSet;
7
8use tokio::sync::broadcast;
9use tracing::debug;
10
11use super::{Event, EventBus, EventTopic, RuntimeEvent};
12
13// ---------------------------------------------------------------------------
14// TopicSubscriber -- topic-based filtering (#94)
15// ---------------------------------------------------------------------------
16
17/// A subscriber that only receives events matching its declared topics.
18pub struct TopicSubscriber {
19    receiver: broadcast::Receiver<RuntimeEvent>,
20    topics: HashSet<EventTopic>,
21}
22
23impl TopicSubscriber {
24    /// Create a new topic subscriber.
25    pub(super) fn new(
26        receiver: broadcast::Receiver<RuntimeEvent>,
27        topics: HashSet<EventTopic>,
28    ) -> Self {
29        Self { receiver, topics }
30    }
31
32    /// Receive the next event matching the subscriber's topics.
33    pub async fn recv(&mut self) -> Option<RuntimeEvent> {
34        loop {
35            match self.receiver.recv().await {
36                Ok(event) => {
37                    if self.topics.contains(&event.topic()) {
38                        return Some(event);
39                    }
40                    // Not interested -- skip.
41                }
42                Err(broadcast::error::RecvError::Lagged(n)) => {
43                    debug!("TopicSubscriber lagged, missed {n} events");
44                }
45                Err(broadcast::error::RecvError::Closed) => return None,
46            }
47        }
48    }
49
50    /// Return the set of topics this subscriber is interested in.
51    pub fn topics(&self) -> &HashSet<EventTopic> {
52        &self.topics
53    }
54}
55
56// ---------------------------------------------------------------------------
57// FilteredSubscriber -- legacy string-based filtering (backward compat)
58// ---------------------------------------------------------------------------
59
60/// Filtered event subscriber -- only receives events matching a filter.
61///
62/// Works with the legacy `event_type` string inside `RuntimeEvent::Custom`.
63pub struct FilteredSubscriber {
64    receiver: broadcast::Receiver<RuntimeEvent>,
65    event_types: Option<Vec<String>>,
66}
67
68impl FilteredSubscriber {
69    /// Create a filtered subscriber.
70    pub fn new(bus: &EventBus, event_types: Option<Vec<String>>) -> Self {
71        Self {
72            receiver: bus.subscribe(),
73            event_types,
74        }
75    }
76
77    /// Receive the next matching event (returns a legacy `Event`).
78    pub async fn recv(&mut self) -> Option<Event> {
79        loop {
80            match self.receiver.recv().await {
81                Ok(runtime_event) => {
82                    // Convert RuntimeEvent to legacy Event for compat.
83                    let legacy = match &runtime_event {
84                        RuntimeEvent::Custom {
85                            event_type,
86                            source,
87                            data,
88                            timestamp_ms,
89                        } => Event {
90                            event_type: event_type.clone(),
91                            source: source.clone(),
92                            data: data.clone(),
93                            timestamp_ms: *timestamp_ms,
94                        },
95                        other => Event {
96                            event_type: format!("{:?}", other.topic()),
97                            source: String::new(),
98                            data: serde_json::to_value(other).unwrap_or(serde_json::Value::Null),
99                            timestamp_ms: other.timestamp_ms(),
100                        },
101                    };
102
103                    if let Some(ref types) = self.event_types
104                        && !types.iter().any(|t| t == &legacy.event_type)
105                    {
106                        continue;
107                    }
108                    return Some(legacy);
109                }
110                Err(broadcast::error::RecvError::Lagged(n)) => {
111                    debug!("Subscriber lagged, missed {n} events");
112                    continue;
113                }
114                Err(broadcast::error::RecvError::Closed) => return None,
115            }
116        }
117    }
118}