Skip to main content

obs_core/
bus.rs

1use crate::alert::{Alert, AlertSeverity};
2
3/// Receives alerts dispatched by the bus.
4pub trait AlertHandler: Send {
5    /// Called synchronously when an alert is dispatched.
6    fn on_alert(&mut self, alert: &Alert);
7}
8
9/// Filtering predicate - returns `true` if this alert should be delivered.
10pub type AlertFilter = Box<dyn Fn(&Alert) -> bool + Send>;
11
12struct Subscription {
13    handler: Box<dyn AlertHandler>,
14    min_severity: AlertSeverity,
15    source_filter: Option<String>,
16}
17
18impl Subscription {
19    fn matches(&self, alert: &Alert) -> bool {
20        if alert.severity < self.min_severity {
21            return false;
22        }
23        if let Some(ref src) = self.source_filter
24            && alert.source != *src
25        {
26            return false;
27        }
28        true
29    }
30}
31
32/// Central alert bus.
33///
34/// Components fire alerts via [`AlertBus::fire`]; subscribers receive them
35/// synchronously in the same call.
36///
37/// # Example
38/// ```
39/// use obs_core::bus::AlertBus;
40/// let bus = AlertBus::new();
41/// assert_eq!(bus.subscriber_count(), 0);
42/// ```
43pub struct AlertBus {
44    subscriptions: Vec<Subscription>,
45    /// All alerts ever fired (bounded by `history_limit`).
46    history: Vec<Alert>,
47    history_limit: usize,
48}
49
50impl AlertBus {
51    /// Create a new bus with the default history limit (1 000 entries).
52    pub fn new() -> Self {
53        Self::with_history_limit(1_000)
54    }
55
56    pub fn with_history_limit(limit: usize) -> Self {
57        Self {
58            subscriptions: Vec::new(),
59            history: Vec::new(),
60            history_limit: limit,
61        }
62    }
63
64    /// Register a handler that receives all alerts at or above `min_severity`.
65    /// If `source` is `Some`, only alerts from that source component are delivered.
66    pub fn subscribe(
67        &mut self,
68        handler: Box<dyn AlertHandler>,
69        min_severity: AlertSeverity,
70        source: Option<String>,
71    ) {
72        self.subscriptions.push(Subscription {
73            handler,
74            min_severity,
75            source_filter: source,
76        });
77    }
78
79    /// Dispatch `alert` to all matching subscribers and add it to history.
80    pub fn fire(&mut self, alert: Alert) {
81        for sub in &mut self.subscriptions {
82            if sub.matches(&alert) {
83                sub.handler.on_alert(&alert);
84            }
85        }
86        if self.history.len() >= self.history_limit {
87            self.history.remove(0);
88        }
89        self.history.push(alert);
90    }
91
92    /// All alerts in history (oldest first).
93    pub fn history(&self) -> &[Alert] {
94        &self.history
95    }
96
97    /// History filtered to `severity >= min`.
98    pub fn history_at_or_above(&self, min: AlertSeverity) -> impl Iterator<Item = &Alert> {
99        self.history.iter().filter(move |a| a.severity >= min)
100    }
101
102    /// History filtered to a single `source`.
103    pub fn history_for_source<'a>(&'a self, source: &str) -> impl Iterator<Item = &'a Alert> {
104        self.history.iter().filter(move |a| a.source == source)
105    }
106
107    pub fn subscriber_count(&self) -> usize {
108        self.subscriptions.len()
109    }
110}
111
112impl Default for AlertBus {
113    fn default() -> Self {
114        Self::new()
115    }
116}