Skip to main content

gatel_core/
events.rs

1//! Lightweight pub/sub event bus for inter-module communication.
2//!
3//! Modules can emit events when interesting things happen (config reload,
4//! certificate issued, upstream health change, etc.) and other modules can
5//! subscribe to those events to react.
6//!
7//! # Example
8//!
9//! ```ignore
10//! let bus = EventBus::new();
11//! let mut rx = bus.subscribe();
12//! bus.emit(Event::ConfigReloaded);
13//! let event = rx.recv().await.unwrap();
14//! ```
15
16use std::fmt;
17
18use tokio::sync::broadcast;
19
20/// Maximum number of events buffered in the channel before old events are
21/// dropped for slow subscribers.
22const EVENT_CHANNEL_CAPACITY: usize = 256;
23
24/// Events that can be emitted by gatel modules.
25#[derive(Debug, Clone)]
26pub enum Event {
27    /// Configuration was reloaded successfully.
28    ConfigReloaded,
29    /// Configuration reload failed.
30    ConfigReloadFailed { error: String },
31    /// A TLS certificate was issued or renewed.
32    CertIssued { domain: String },
33    /// A TLS certificate renewal failed.
34    CertRenewalFailed { domain: String, error: String },
35    /// An upstream backend changed health status.
36    UpstreamHealthChanged { address: String, healthy: bool },
37    /// The server is shutting down.
38    ShutdownInitiated,
39    /// Custom event from a plugin module.
40    Custom { name: String, data: String },
41}
42
43impl fmt::Display for Event {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        match self {
46            Event::ConfigReloaded => write!(f, "config_reloaded"),
47            Event::ConfigReloadFailed { error } => write!(f, "config_reload_failed: {error}"),
48            Event::CertIssued { domain } => write!(f, "cert_issued: {domain}"),
49            Event::CertRenewalFailed { domain, error } => {
50                write!(f, "cert_renewal_failed: {domain}: {error}")
51            }
52            Event::UpstreamHealthChanged { address, healthy } => {
53                write!(f, "upstream_health: {address} healthy={healthy}")
54            }
55            Event::ShutdownInitiated => write!(f, "shutdown_initiated"),
56            Event::Custom { name, data } => write!(f, "custom:{name}: {data}"),
57        }
58    }
59}
60
61/// Broadcast-based event bus.
62///
63/// Cloning an `EventBus` produces a handle to the same underlying channel.
64#[derive(Clone)]
65pub struct EventBus {
66    tx: broadcast::Sender<Event>,
67}
68
69impl EventBus {
70    /// Create a new event bus.
71    pub fn new() -> Self {
72        let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
73        Self { tx }
74    }
75
76    /// Emit an event to all subscribers.
77    ///
78    /// Returns the number of subscribers that received the event.
79    /// If there are no subscribers the event is silently dropped.
80    pub fn emit(&self, event: Event) -> usize {
81        self.tx.send(event).unwrap_or(0)
82    }
83
84    /// Subscribe to events. Returns a receiver that yields events.
85    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
86        self.tx.subscribe()
87    }
88}
89
90impl Default for EventBus {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[tokio::test]
101    async fn emit_and_receive() {
102        let bus = EventBus::new();
103        let mut rx = bus.subscribe();
104
105        bus.emit(Event::ConfigReloaded);
106
107        let event = rx.recv().await.unwrap();
108        assert!(matches!(event, Event::ConfigReloaded));
109    }
110
111    #[tokio::test]
112    async fn multiple_subscribers() {
113        let bus = EventBus::new();
114        let mut rx1 = bus.subscribe();
115        let mut rx2 = bus.subscribe();
116
117        let count = bus.emit(Event::ShutdownInitiated);
118        assert_eq!(count, 2);
119
120        assert!(matches!(
121            rx1.recv().await.unwrap(),
122            Event::ShutdownInitiated
123        ));
124        assert!(matches!(
125            rx2.recv().await.unwrap(),
126            Event::ShutdownInitiated
127        ));
128    }
129
130    #[tokio::test]
131    async fn no_subscribers() {
132        let bus = EventBus::new();
133        let count = bus.emit(Event::ConfigReloaded);
134        assert_eq!(count, 0);
135    }
136
137    #[tokio::test]
138    async fn custom_event() {
139        let bus = EventBus::new();
140        let mut rx = bus.subscribe();
141
142        bus.emit(Event::Custom {
143            name: "my_plugin".into(),
144            data: "something happened".into(),
145        });
146
147        let event = rx.recv().await.unwrap();
148        if let Event::Custom { name, data } = event {
149            assert_eq!(name, "my_plugin");
150            assert_eq!(data, "something happened");
151        } else {
152            panic!("expected Custom event");
153        }
154    }
155}