1use std::{
2 collections::{HashMap, VecDeque},
3 sync::{
4 atomic::{AtomicU64, Ordering},
5 Arc, Condvar, Mutex, Weak,
6 },
7};
8
9use crate::{error::EventSubscriptionError, event::ClientEvent};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub struct SubscriberConfig {
13 pub capacity: usize,
14 pub overflow: SubscriberOverflow,
15}
16
17impl Default for SubscriberConfig {
18 fn default() -> Self {
19 Self {
20 capacity: 1024,
21 overflow: SubscriberOverflow::DropNewest,
22 }
23 }
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum SubscriberOverflow {
32 DropNewest,
34 DropOldest,
36 Disconnect,
38}
39
40#[derive(Debug, Clone)]
41pub struct EventHub {
42 inner: Arc<HubInner>,
43}
44
45#[derive(Debug)]
46struct HubInner {
47 next_id: AtomicU64,
48 subscribers: Mutex<HashMap<u64, Arc<SubscriberInner>>>,
49}
50
51#[must_use = "dropping the subscription unregisters it"]
57#[derive(Debug)]
58pub struct EventSubscription {
59 id: u64,
60 hub: Weak<HubInner>,
61 inner: Arc<SubscriberInner>,
62}
63
64#[derive(Debug)]
65struct SubscriberInner {
66 state: Mutex<SubscriberState>,
67 available: Condvar,
68 config: SubscriberConfig,
69}
70
71#[derive(Debug)]
72struct SubscriberState {
73 queue: VecDeque<ClientEvent>,
74 connected: bool,
75}
76
77impl EventHub {
78 pub fn new() -> Self {
79 Self {
80 inner: Arc::new(HubInner {
81 next_id: AtomicU64::new(1),
82 subscribers: Mutex::new(HashMap::new()),
83 }),
84 }
85 }
86
87 pub fn subscribe(
88 &self,
89 config: SubscriberConfig,
90 ) -> Result<EventSubscription, crate::ClientError> {
91 if config.capacity == 0 {
92 return Err(crate::ClientError::InvalidConfig {
93 reason: "subscriber capacity must be greater than zero".to_owned(),
94 });
95 }
96
97 let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
98 let inner = Arc::new(SubscriberInner {
99 state: Mutex::new(SubscriberState {
100 queue: VecDeque::with_capacity(config.capacity),
101 connected: true,
102 }),
103 available: Condvar::new(),
104 config,
105 });
106 self.inner
107 .subscribers
108 .lock()
109 .expect("event hub mutex poisoned")
110 .insert(id, inner.clone());
111
112 Ok(EventSubscription {
113 id,
114 hub: Arc::downgrade(&self.inner),
115 inner,
116 })
117 }
118
119 pub fn publish(&self, event: ClientEvent) {
120 let subscribers: Vec<(u64, Arc<SubscriberInner>)> = self
121 .inner
122 .subscribers
123 .lock()
124 .expect("event hub mutex poisoned")
125 .iter()
126 .map(|(id, subscriber)| (*id, subscriber.clone()))
127 .collect();
128
129 let mut disconnected = Vec::new();
130 for (id, subscriber) in subscribers {
131 if !subscriber.publish(event.clone()) {
134 disconnected.push(id);
135 }
136 }
137
138 if disconnected.is_empty() {
139 return;
140 }
141
142 let mut subscribers = self
143 .inner
144 .subscribers
145 .lock()
146 .expect("event hub mutex poisoned");
147 for id in disconnected {
148 subscribers.remove(&id);
149 }
150 }
151
152 pub fn disconnect_all(&self) {
153 let subscribers: Vec<Arc<SubscriberInner>> = self
154 .inner
155 .subscribers
156 .lock()
157 .expect("event hub mutex poisoned")
158 .drain()
159 .map(|(_, subscriber)| subscriber)
160 .collect();
161
162 for subscriber in subscribers {
163 subscriber.disconnect();
164 }
165 }
166
167 #[cfg(test)]
168 fn subscriber_count(&self) -> usize {
169 self.inner
170 .subscribers
171 .lock()
172 .expect("event hub mutex poisoned")
173 .len()
174 }
175}
176
177impl Default for EventHub {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183impl EventSubscription {
184 pub fn recv(&self) -> Result<ClientEvent, EventSubscriptionError> {
190 let mut state = self.inner.state.lock().expect("subscriber mutex poisoned");
191 loop {
192 if let Some(event) = state.queue.pop_front() {
193 return Ok(event);
194 }
195 if !state.connected {
196 return Err(EventSubscriptionError::Disconnected);
197 }
198 state = self
199 .inner
200 .available
201 .wait(state)
202 .expect("subscriber mutex poisoned");
203 }
204 }
205
206 pub fn try_recv(&self) -> Result<Option<ClientEvent>, EventSubscriptionError> {
213 let mut state = self.inner.state.lock().expect("subscriber mutex poisoned");
214 if let Some(event) = state.queue.pop_front() {
215 return Ok(Some(event));
216 }
217 if !state.connected {
218 return Err(EventSubscriptionError::Disconnected);
219 }
220 Ok(None)
221 }
222}
223
224impl Drop for EventSubscription {
225 fn drop(&mut self) {
226 self.inner.disconnect();
227 if let Some(hub) = self.hub.upgrade() {
228 hub.subscribers
229 .lock()
230 .expect("event hub mutex poisoned")
231 .remove(&self.id);
232 }
233 }
234}
235
236impl SubscriberInner {
237 fn publish(&self, event: ClientEvent) -> bool {
238 let mut state = self.state.lock().expect("subscriber mutex poisoned");
239 if !state.connected {
240 return false;
241 }
242
243 if state.queue.len() < self.config.capacity {
244 state.queue.push_back(event);
245 self.available.notify_one();
246 return true;
247 }
248
249 match self.config.overflow {
250 SubscriberOverflow::DropNewest => true,
251 SubscriberOverflow::DropOldest => {
252 state.queue.pop_front();
253 state.queue.push_back(event);
254 self.available.notify_one();
255 true
256 }
257 SubscriberOverflow::Disconnect => {
258 state.queue.clear();
259 state.connected = false;
260 self.available.notify_all();
261 false
262 }
263 }
264 }
265
266 fn disconnect(&self) {
267 let mut state = self.state.lock().expect("subscriber mutex poisoned");
268 state.connected = false;
269 self.available.notify_all();
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crate::{ClientTimestamp, WarningKind};
277
278 fn event(n: usize) -> ClientEvent {
279 ClientEvent::Warning {
280 kind: WarningKind::UntrackedReply,
281 message: format!("event-{n}"),
282 }
283 }
284
285 fn event_message(event: ClientEvent) -> String {
286 match event {
287 ClientEvent::Warning { message, .. } => message,
288 other => panic!("unexpected event: {other:?}"),
289 }
290 }
291
292 #[test]
293 fn publishes_to_one_subscriber() {
294 let hub = EventHub::new();
295 let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
296
297 hub.publish(event(1));
298
299 assert_eq!(event_message(sub.try_recv().unwrap().unwrap()), "event-1");
300 }
301
302 #[test]
303 fn publishes_to_multiple_subscribers() {
304 let hub = EventHub::new();
305 let a = hub.subscribe(SubscriberConfig::default()).unwrap();
306 let b = hub.subscribe(SubscriberConfig::default()).unwrap();
307
308 hub.publish(event(1));
309
310 assert_eq!(event_message(a.try_recv().unwrap().unwrap()), "event-1");
311 assert_eq!(event_message(b.try_recv().unwrap().unwrap()), "event-1");
312 }
313
314 #[test]
315 fn drop_newest_keeps_existing_events() {
316 let hub = EventHub::new();
317 let sub = hub
318 .subscribe(SubscriberConfig {
319 capacity: 1,
320 overflow: SubscriberOverflow::DropNewest,
321 })
322 .unwrap();
323
324 hub.publish(event(1));
325 hub.publish(event(2));
326
327 assert_eq!(event_message(sub.try_recv().unwrap().unwrap()), "event-1");
328 assert!(sub.try_recv().unwrap().is_none());
329 }
330
331 #[test]
332 fn drop_oldest_replaces_existing_events() {
333 let hub = EventHub::new();
334 let sub = hub
335 .subscribe(SubscriberConfig {
336 capacity: 1,
337 overflow: SubscriberOverflow::DropOldest,
338 })
339 .unwrap();
340
341 hub.publish(event(1));
342 hub.publish(event(2));
343
344 assert_eq!(event_message(sub.try_recv().unwrap().unwrap()), "event-2");
345 assert!(sub.try_recv().unwrap().is_none());
346 }
347
348 #[test]
349 fn disconnect_removes_full_subscriber() {
350 let hub = EventHub::new();
351 let sub = hub
352 .subscribe(SubscriberConfig {
353 capacity: 1,
354 overflow: SubscriberOverflow::Disconnect,
355 })
356 .unwrap();
357
358 hub.publish(event(1));
359 hub.publish(event(2));
360
361 assert_eq!(
362 sub.try_recv().unwrap_err(),
363 EventSubscriptionError::Disconnected
364 );
365 }
366
367 #[test]
368 fn full_subscriber_does_not_prevent_other_delivery() {
369 let hub = EventHub::new();
370 let slow = hub
371 .subscribe(SubscriberConfig {
372 capacity: 1,
373 overflow: SubscriberOverflow::DropNewest,
374 })
375 .unwrap();
376 let fast = hub
377 .subscribe(SubscriberConfig {
378 capacity: 4,
379 overflow: SubscriberOverflow::DropNewest,
380 })
381 .unwrap();
382
383 hub.publish(event(1));
384 hub.publish(event(2));
385
386 assert_eq!(event_message(slow.try_recv().unwrap().unwrap()), "event-1");
387 assert_eq!(event_message(fast.try_recv().unwrap().unwrap()), "event-1");
388 assert_eq!(event_message(fast.try_recv().unwrap().unwrap()), "event-2");
389 }
390
391 #[test]
392 fn subscribing_after_events_does_not_replay() {
393 let hub = EventHub::new();
394 hub.publish(event(1));
395
396 let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
397
398 assert!(sub.try_recv().unwrap().is_none());
399 }
400
401 #[test]
402 fn dropping_subscription_unregisters_from_hub() {
403 let hub = EventHub::new();
404 let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
405 assert_eq!(hub.subscriber_count(), 1);
406
407 drop(sub);
408
409 assert_eq!(hub.subscriber_count(), 0);
410 }
411
412 #[test]
413 fn disconnect_all_wakes_blocking_receivers() {
414 let hub = EventHub::new();
415 let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
416 hub.disconnect_all();
417
418 assert_eq!(
419 sub.recv().unwrap_err(),
420 EventSubscriptionError::Disconnected
421 );
422 }
423
424 #[test]
425 fn session_closed_event_can_be_queued() {
426 let hub = EventHub::new();
427 let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
428 hub.publish(ClientEvent::SessionClosed {
429 remote: "127.0.0.1:1".parse().unwrap(),
430 token: 1,
431 at: ClientTimestamp::now(),
432 });
433
434 assert!(matches!(
435 sub.recv().unwrap(),
436 ClientEvent::SessionClosed { token: 1, .. }
437 ));
438 }
439}