1use std::fmt;
17
18use tokio::sync::broadcast;
19
20const EVENT_CHANNEL_CAPACITY: usize = 256;
23
24#[derive(Debug, Clone)]
26pub enum Event {
27 ConfigReloaded,
29 ConfigReloadFailed { error: String },
31 CertIssued { domain: String },
33 CertRenewalFailed { domain: String, error: String },
35 UpstreamHealthChanged { address: String, healthy: bool },
37 ShutdownInitiated,
39 Custom { name: String, data: String },
41}
42
43impl fmt::Display for Event {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 match self {
46 Event::ConfigReloaded => write!(f, "config_reloaded"),
47 Event::ConfigReloadFailed { error } => write!(f, "config_reload_failed: {error}"),
48 Event::CertIssued { domain } => write!(f, "cert_issued: {domain}"),
49 Event::CertRenewalFailed { domain, error } => {
50 write!(f, "cert_renewal_failed: {domain}: {error}")
51 }
52 Event::UpstreamHealthChanged { address, healthy } => {
53 write!(f, "upstream_health: {address} healthy={healthy}")
54 }
55 Event::ShutdownInitiated => write!(f, "shutdown_initiated"),
56 Event::Custom { name, data } => write!(f, "custom:{name}: {data}"),
57 }
58 }
59}
60
61#[derive(Clone)]
65pub struct EventBus {
66 tx: broadcast::Sender<Event>,
67}
68
69impl EventBus {
70 pub fn new() -> Self {
72 let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
73 Self { tx }
74 }
75
76 pub fn emit(&self, event: Event) -> usize {
81 self.tx.send(event).unwrap_or(0)
82 }
83
84 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
86 self.tx.subscribe()
87 }
88}
89
90impl Default for EventBus {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[tokio::test]
101 async fn emit_and_receive() {
102 let bus = EventBus::new();
103 let mut rx = bus.subscribe();
104
105 bus.emit(Event::ConfigReloaded);
106
107 let event = rx.recv().await.unwrap();
108 assert!(matches!(event, Event::ConfigReloaded));
109 }
110
111 #[tokio::test]
112 async fn multiple_subscribers() {
113 let bus = EventBus::new();
114 let mut rx1 = bus.subscribe();
115 let mut rx2 = bus.subscribe();
116
117 let count = bus.emit(Event::ShutdownInitiated);
118 assert_eq!(count, 2);
119
120 assert!(matches!(
121 rx1.recv().await.unwrap(),
122 Event::ShutdownInitiated
123 ));
124 assert!(matches!(
125 rx2.recv().await.unwrap(),
126 Event::ShutdownInitiated
127 ));
128 }
129
130 #[tokio::test]
131 async fn no_subscribers() {
132 let bus = EventBus::new();
133 let count = bus.emit(Event::ConfigReloaded);
134 assert_eq!(count, 0);
135 }
136
137 #[tokio::test]
138 async fn custom_event() {
139 let bus = EventBus::new();
140 let mut rx = bus.subscribe();
141
142 bus.emit(Event::Custom {
143 name: "my_plugin".into(),
144 data: "something happened".into(),
145 });
146
147 let event = rx.recv().await.unwrap();
148 if let Event::Custom { name, data } = event {
149 assert_eq!(name, "my_plugin");
150 assert_eq!(data, "something happened");
151 } else {
152 panic!("expected Custom event");
153 }
154 }
155}