Skip to main content

irtt_client/managed/
hub.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    sync::{
4        atomic::{AtomicU64, Ordering},
5        Arc, Condvar, Mutex, Weak,
6    },
7};
8
9use crate::{error::EventSubscriptionError, event::ClientEvent};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub struct SubscriberConfig {
13    pub capacity: usize,
14    pub overflow: SubscriberOverflow,
15}
16
17impl Default for SubscriberConfig {
18    fn default() -> Self {
19        Self {
20            capacity: 1024,
21            overflow: SubscriberOverflow::DropNewest,
22        }
23    }
24}
25
26/// Overflow behavior for a bounded event subscription queue.
27///
28/// These policies are applied independently per subscriber when that
29/// subscriber's queue is full.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum SubscriberOverflow {
32    /// Leave the existing queue unchanged and discard the newly published event.
33    DropNewest,
34    /// Remove the oldest queued event and enqueue the newly published event.
35    DropOldest,
36    /// Disconnect the subscriber and clear its queue.
37    Disconnect,
38}
39
40#[derive(Debug, Clone)]
41pub struct EventHub {
42    inner: Arc<HubInner>,
43}
44
45#[derive(Debug)]
46struct HubInner {
47    next_id: AtomicU64,
48    subscribers: Mutex<HashMap<u64, Arc<SubscriberInner>>>,
49}
50
51/// Handle for receiving managed client events.
52///
53/// Each subscription has its own bounded queue configured by
54/// [`SubscriberConfig`]. The configured [`SubscriberOverflow`] policy applies
55/// when that queue is full. Dropping the handle unregisters the subscriber.
56#[must_use = "dropping the subscription unregisters it"]
57#[derive(Debug)]
58pub struct EventSubscription {
59    id: u64,
60    hub: Weak<HubInner>,
61    inner: Arc<SubscriberInner>,
62}
63
64#[derive(Debug)]
65struct SubscriberInner {
66    state: Mutex<SubscriberState>,
67    available: Condvar,
68    config: SubscriberConfig,
69}
70
71#[derive(Debug)]
72struct SubscriberState {
73    queue: VecDeque<ClientEvent>,
74    connected: bool,
75}
76
77impl EventHub {
78    pub fn new() -> Self {
79        Self {
80            inner: Arc::new(HubInner {
81                next_id: AtomicU64::new(1),
82                subscribers: Mutex::new(HashMap::new()),
83            }),
84        }
85    }
86
87    pub fn subscribe(
88        &self,
89        config: SubscriberConfig,
90    ) -> Result<EventSubscription, crate::ClientError> {
91        if config.capacity == 0 {
92            return Err(crate::ClientError::InvalidConfig {
93                reason: "subscriber capacity must be greater than zero".to_owned(),
94            });
95        }
96
97        let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
98        let inner = Arc::new(SubscriberInner {
99            state: Mutex::new(SubscriberState {
100                queue: VecDeque::with_capacity(config.capacity),
101                connected: true,
102            }),
103            available: Condvar::new(),
104            config,
105        });
106        self.inner
107            .subscribers
108            .lock()
109            .expect("event hub mutex poisoned")
110            .insert(id, inner.clone());
111
112        Ok(EventSubscription {
113            id,
114            hub: Arc::downgrade(&self.inner),
115            inner,
116        })
117    }
118
119    pub fn publish(&self, event: ClientEvent) {
120        let subscribers: Vec<(u64, Arc<SubscriberInner>)> = self
121            .inner
122            .subscribers
123            .lock()
124            .expect("event hub mutex poisoned")
125            .iter()
126            .map(|(id, subscriber)| (*id, subscriber.clone()))
127            .collect();
128
129        let mut disconnected = Vec::new();
130        for (id, subscriber) in subscribers {
131            // Events are cloned per subscriber; bounded queues keep slow
132            // consumers from causing unbounded memory growth.
133            if !subscriber.publish(event.clone()) {
134                disconnected.push(id);
135            }
136        }
137
138        if disconnected.is_empty() {
139            return;
140        }
141
142        let mut subscribers = self
143            .inner
144            .subscribers
145            .lock()
146            .expect("event hub mutex poisoned");
147        for id in disconnected {
148            subscribers.remove(&id);
149        }
150    }
151
152    pub fn disconnect_all(&self) {
153        let subscribers: Vec<Arc<SubscriberInner>> = self
154            .inner
155            .subscribers
156            .lock()
157            .expect("event hub mutex poisoned")
158            .drain()
159            .map(|(_, subscriber)| subscriber)
160            .collect();
161
162        for subscriber in subscribers {
163            subscriber.disconnect();
164        }
165    }
166
167    #[cfg(test)]
168    fn subscriber_count(&self) -> usize {
169        self.inner
170            .subscribers
171            .lock()
172            .expect("event hub mutex poisoned")
173            .len()
174    }
175}
176
177impl Default for EventHub {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183impl EventSubscription {
184    /// Block until the next queued event is available.
185    ///
186    /// If the subscription is disconnected, this returns
187    /// [`EventSubscriptionError::Disconnected`] after any already queued events
188    /// have been drained.
189    pub fn recv(&self) -> Result<ClientEvent, EventSubscriptionError> {
190        let mut state = self.inner.state.lock().expect("subscriber mutex poisoned");
191        loop {
192            if let Some(event) = state.queue.pop_front() {
193                return Ok(event);
194            }
195            if !state.connected {
196                return Err(EventSubscriptionError::Disconnected);
197            }
198            state = self
199                .inner
200                .available
201                .wait(state)
202                .expect("subscriber mutex poisoned");
203        }
204    }
205
206    /// Try to receive one queued event without blocking.
207    ///
208    /// Returns `Ok(None)` when the subscription is still connected but no event
209    /// is queued. If the subscription is disconnected, this returns
210    /// [`EventSubscriptionError::Disconnected`] after any already queued events
211    /// have been drained.
212    pub fn try_recv(&self) -> Result<Option<ClientEvent>, EventSubscriptionError> {
213        let mut state = self.inner.state.lock().expect("subscriber mutex poisoned");
214        if let Some(event) = state.queue.pop_front() {
215            return Ok(Some(event));
216        }
217        if !state.connected {
218            return Err(EventSubscriptionError::Disconnected);
219        }
220        Ok(None)
221    }
222}
223
224impl Drop for EventSubscription {
225    fn drop(&mut self) {
226        self.inner.disconnect();
227        if let Some(hub) = self.hub.upgrade() {
228            hub.subscribers
229                .lock()
230                .expect("event hub mutex poisoned")
231                .remove(&self.id);
232        }
233    }
234}
235
236impl SubscriberInner {
237    fn publish(&self, event: ClientEvent) -> bool {
238        let mut state = self.state.lock().expect("subscriber mutex poisoned");
239        if !state.connected {
240            return false;
241        }
242
243        if state.queue.len() < self.config.capacity {
244            state.queue.push_back(event);
245            self.available.notify_one();
246            return true;
247        }
248
249        match self.config.overflow {
250            SubscriberOverflow::DropNewest => true,
251            SubscriberOverflow::DropOldest => {
252                state.queue.pop_front();
253                state.queue.push_back(event);
254                self.available.notify_one();
255                true
256            }
257            SubscriberOverflow::Disconnect => {
258                state.queue.clear();
259                state.connected = false;
260                self.available.notify_all();
261                false
262            }
263        }
264    }
265
266    fn disconnect(&self) {
267        let mut state = self.state.lock().expect("subscriber mutex poisoned");
268        state.connected = false;
269        self.available.notify_all();
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crate::{ClientTimestamp, WarningKind};
277
278    fn event(n: usize) -> ClientEvent {
279        ClientEvent::Warning {
280            kind: WarningKind::UntrackedReply,
281            message: format!("event-{n}"),
282        }
283    }
284
285    fn event_message(event: ClientEvent) -> String {
286        match event {
287            ClientEvent::Warning { message, .. } => message,
288            other => panic!("unexpected event: {other:?}"),
289        }
290    }
291
292    #[test]
293    fn publishes_to_one_subscriber() {
294        let hub = EventHub::new();
295        let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
296
297        hub.publish(event(1));
298
299        assert_eq!(event_message(sub.try_recv().unwrap().unwrap()), "event-1");
300    }
301
302    #[test]
303    fn publishes_to_multiple_subscribers() {
304        let hub = EventHub::new();
305        let a = hub.subscribe(SubscriberConfig::default()).unwrap();
306        let b = hub.subscribe(SubscriberConfig::default()).unwrap();
307
308        hub.publish(event(1));
309
310        assert_eq!(event_message(a.try_recv().unwrap().unwrap()), "event-1");
311        assert_eq!(event_message(b.try_recv().unwrap().unwrap()), "event-1");
312    }
313
314    #[test]
315    fn drop_newest_keeps_existing_events() {
316        let hub = EventHub::new();
317        let sub = hub
318            .subscribe(SubscriberConfig {
319                capacity: 1,
320                overflow: SubscriberOverflow::DropNewest,
321            })
322            .unwrap();
323
324        hub.publish(event(1));
325        hub.publish(event(2));
326
327        assert_eq!(event_message(sub.try_recv().unwrap().unwrap()), "event-1");
328        assert!(sub.try_recv().unwrap().is_none());
329    }
330
331    #[test]
332    fn drop_oldest_replaces_existing_events() {
333        let hub = EventHub::new();
334        let sub = hub
335            .subscribe(SubscriberConfig {
336                capacity: 1,
337                overflow: SubscriberOverflow::DropOldest,
338            })
339            .unwrap();
340
341        hub.publish(event(1));
342        hub.publish(event(2));
343
344        assert_eq!(event_message(sub.try_recv().unwrap().unwrap()), "event-2");
345        assert!(sub.try_recv().unwrap().is_none());
346    }
347
348    #[test]
349    fn disconnect_removes_full_subscriber() {
350        let hub = EventHub::new();
351        let sub = hub
352            .subscribe(SubscriberConfig {
353                capacity: 1,
354                overflow: SubscriberOverflow::Disconnect,
355            })
356            .unwrap();
357
358        hub.publish(event(1));
359        hub.publish(event(2));
360
361        assert_eq!(
362            sub.try_recv().unwrap_err(),
363            EventSubscriptionError::Disconnected
364        );
365    }
366
367    #[test]
368    fn full_subscriber_does_not_prevent_other_delivery() {
369        let hub = EventHub::new();
370        let slow = hub
371            .subscribe(SubscriberConfig {
372                capacity: 1,
373                overflow: SubscriberOverflow::DropNewest,
374            })
375            .unwrap();
376        let fast = hub
377            .subscribe(SubscriberConfig {
378                capacity: 4,
379                overflow: SubscriberOverflow::DropNewest,
380            })
381            .unwrap();
382
383        hub.publish(event(1));
384        hub.publish(event(2));
385
386        assert_eq!(event_message(slow.try_recv().unwrap().unwrap()), "event-1");
387        assert_eq!(event_message(fast.try_recv().unwrap().unwrap()), "event-1");
388        assert_eq!(event_message(fast.try_recv().unwrap().unwrap()), "event-2");
389    }
390
391    #[test]
392    fn subscribing_after_events_does_not_replay() {
393        let hub = EventHub::new();
394        hub.publish(event(1));
395
396        let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
397
398        assert!(sub.try_recv().unwrap().is_none());
399    }
400
401    #[test]
402    fn dropping_subscription_unregisters_from_hub() {
403        let hub = EventHub::new();
404        let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
405        assert_eq!(hub.subscriber_count(), 1);
406
407        drop(sub);
408
409        assert_eq!(hub.subscriber_count(), 0);
410    }
411
412    #[test]
413    fn disconnect_all_wakes_blocking_receivers() {
414        let hub = EventHub::new();
415        let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
416        hub.disconnect_all();
417
418        assert_eq!(
419            sub.recv().unwrap_err(),
420            EventSubscriptionError::Disconnected
421        );
422    }
423
424    #[test]
425    fn session_closed_event_can_be_queued() {
426        let hub = EventHub::new();
427        let sub = hub.subscribe(SubscriberConfig::default()).unwrap();
428        hub.publish(ClientEvent::SessionClosed {
429            remote: "127.0.0.1:1".parse().unwrap(),
430            token: 1,
431            at: ClientTimestamp::now(),
432        });
433
434        assert!(matches!(
435            sub.recv().unwrap(),
436            ClientEvent::SessionClosed { token: 1, .. }
437        ));
438    }
439}