Skip to main content

ubiquisync_core/
event.rs

1//! In-process fan-out of change events to subscribers, keyed by target.
2//!
3//! [`event_bus`] returns an [`EventBusPublisher`]/[`EventBus`] pair sharing one
4//! mutex-guarded map: the producer holds the publisher and emits events; subscribers
5//! hold `EventBus` and [`subscribe`](EventBus::subscribe) to a
6//! [`RoutableEvent::Target`], receiving a [`Subscription`] (a [`Stream`]) of the
7//! events routed there. Targets left with no subscribers are removed eagerly, so
8//! the map holds exactly the targets currently being watched.
9
10use std::collections::HashMap;
11use std::collections::hash_map::Entry;
12use std::hash::Hash;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex, MutexGuard};
15use std::task::{Context, Poll};
16
17use futures_channel::mpsc;
18use futures_core::Stream;
19
20/// An event that knows the set of targets it should be delivered to.
21pub trait RoutableEvent: Clone {
22    /// The routing key subscribers select — the granularity of a subscription.
23    type Target: Eq + Hash + Clone;
24
25    /// The targets this event fans out to. Treated as a set: yielding the same
26    /// target twice delivers the event to that target's subscribers twice.
27    fn targets(&self) -> impl Iterator<Item = Self::Target>;
28}
29
30/// A sink events are published into. The write end of an [`event_bus`] is one;
31/// a producer holds `dyn Publisher`/`impl Publisher` so the bus — or a no-op —
32/// can be swapped behind it. `Send + Sync` is required by the sharer (e.g. a
33/// cross-thread producer), not here.
34pub trait Publisher<E> {
35    /// Deliver `event` to whatever this publisher feeds.
36    fn publish(&self, event: E);
37}
38
39/// A [`Publisher`] that discards every event — for producers with nothing
40/// listening (a headless replica) or that opt out of change events.
41pub struct NoopPublisher;
42
43impl<E> Publisher<E> for NoopPublisher {
44    fn publish(&self, _event: E) {}
45}
46
47/// Constructs the read side of an event stream, paired with its [`Publisher`]
48/// write end. A producer holds `impl EventHandler` so it can both emit (through
49/// `Publish`) and hand out subscriptions (through the handler itself), with no
50/// bound on the event type. `Send + Sync` is required by the sharer (a
51/// cross-thread producer), not here.
52pub trait EventHandler<E> {
53    /// The write end paired with this handler.
54    type Publish: Publisher<E>;
55
56    /// Build a fresh `(publisher, handler)` sharing one stream.
57    fn init() -> (Self::Publish, Self);
58}
59
60/// An [`EventHandler`] that discards events — pairs [`NoopPublisher`] with itself
61/// for a producer with nothing listening.
62pub struct NoopHandler;
63
64impl<E> EventHandler<E> for NoopHandler {
65    type Publish = NoopPublisher;
66    fn init() -> (Self::Publish, Self) {
67        (NoopPublisher, NoopHandler)
68    }
69}
70
71/// Per-subscriber channel depth. A subscriber this far behind is dropped.
72const SUBSCRIBER_BUFFER: usize = 256;
73
74/// The subscribers watching one target, keyed by subscription id.
75type Subscribers<T> = HashMap<u64, mpsc::Sender<T>>;
76
77struct Inner<T: RoutableEvent> {
78    targets: HashMap<T::Target, Subscribers<T>>,
79    next_id: u64,
80}
81
82/// The write end: emits events into the bus. Held by the producer.
83pub struct EventBusPublisher<T: RoutableEvent> {
84    inner: Arc<Mutex<Inner<T>>>,
85}
86
87/// The read end: hands out [`Subscription`]s. Cheap to clone and share.
88pub struct EventBus<T: RoutableEvent> {
89    inner: Arc<Mutex<Inner<T>>>,
90}
91
92impl<T: RoutableEvent> Clone for EventBus<T> {
93    fn clone(&self) -> Self {
94        Self {
95            inner: Arc::clone(&self.inner),
96        }
97    }
98}
99
100/// Create an event bus, returning its `(write, read)` handles.
101pub fn event_bus<T: RoutableEvent>() -> (EventBusPublisher<T>, EventBus<T>) {
102    let inner = Arc::new(Mutex::new(Inner {
103        targets: HashMap::new(),
104        next_id: 0,
105    }));
106    (
107        EventBusPublisher {
108            inner: Arc::clone(&inner),
109        },
110        EventBus { inner },
111    )
112}
113
114impl<T: RoutableEvent> EventHandler<T> for EventBus<T> {
115    type Publish = EventBusPublisher<T>;
116    fn init() -> (Self::Publish, Self) {
117        event_bus()
118    }
119}
120
121impl<T: RoutableEvent> Publisher<T> for EventBusPublisher<T> {
122    /// Deliver `event` to every subscriber of each of its targets, pruning any
123    /// whose channel is full or gone and dropping targets left with none.
124    fn publish(&self, event: T) {
125        let mut inner = lock(&self.inner);
126        for target in event.targets() {
127            if let Entry::Occupied(mut e) = inner.targets.entry(target) {
128                e.get_mut()
129                    .retain(|_, tx| tx.try_send(event.clone()).is_ok());
130                if e.get().is_empty() {
131                    e.remove();
132                }
133            }
134        }
135    }
136}
137
138impl<T: RoutableEvent> EventBus<T> {
139    /// Subscribe to every event routed to `target`.
140    pub fn subscribe(&self, target: T::Target) -> Subscription<T> {
141        let (tx, rx) = mpsc::channel(SUBSCRIBER_BUFFER);
142        let mut inner = lock(&self.inner);
143        let id = inner.next_id;
144        inner.next_id += 1;
145        inner
146            .targets
147            .entry(target.clone())
148            .or_default()
149            .insert(id, tx);
150        Subscription {
151            inner: Arc::clone(&self.inner),
152            target,
153            id,
154            rx,
155        }
156    }
157}
158
159/// A live subscription to one target: a [`Stream`] of the events routed there.
160/// Dropping it unsubscribes (and drops the target if it was the last one).
161pub struct Subscription<T: RoutableEvent> {
162    inner: Arc<Mutex<Inner<T>>>,
163    target: T::Target,
164    id: u64,
165    rx: mpsc::Receiver<T>,
166}
167
168impl<T: RoutableEvent> Stream for Subscription<T>
169where
170    T::Target: Unpin,
171{
172    type Item = T;
173
174    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
175        Pin::new(&mut self.get_mut().rx).poll_next(cx)
176    }
177}
178
179impl<T: RoutableEvent> Drop for Subscription<T> {
180    fn drop(&mut self) {
181        let mut inner = lock(&self.inner);
182        if let Entry::Occupied(mut e) = inner.targets.entry(self.target.clone()) {
183            e.get_mut().remove(&self.id);
184            if e.get().is_empty() {
185                e.remove();
186            }
187        }
188    }
189}
190
191/// Lock, recovering from a poisoned guard. A panic mid-critical-section (e.g. a
192/// subscriber's `Clone` panicking during fan-out) can leave the map partially
193/// updated but always a valid `HashMap`, so recovering the guard beats poisoning
194/// every publisher and subscriber.
195fn lock<G>(m: &Mutex<G>) -> MutexGuard<'_, G> {
196    m.lock().unwrap_or_else(|e| e.into_inner())
197}
198
199#[cfg(test)]
200mod tests {
201    use futures::{FutureExt, StreamExt};
202
203    use super::*;
204
205    #[derive(Clone)]
206    struct Ev {
207        n: i32,
208        to: Vec<u32>,
209    }
210
211    impl RoutableEvent for Ev {
212        type Target = u32;
213        fn targets(&self) -> impl Iterator<Item = u32> {
214            self.to.clone().into_iter()
215        }
216    }
217
218    fn ev(n: i32, to: &[u32]) -> Ev {
219        Ev { n, to: to.to_vec() }
220    }
221
222    /// Poll the stream once without blocking: `Some` if an event is ready now.
223    fn recv(sub: &mut Subscription<Ev>) -> Option<i32> {
224        sub.next().now_or_never().flatten().map(|e| e.n)
225    }
226
227    fn target_count(bus: &EventBus<Ev>) -> usize {
228        lock(&bus.inner).targets.len()
229    }
230
231    #[test]
232    fn delivers_to_subscriber() {
233        let (p, bus) = event_bus::<Ev>();
234        let mut s = bus.subscribe(1);
235        p.publish(ev(10, &[1]));
236        assert_eq!(recv(&mut s), Some(10));
237    }
238
239    #[test]
240    fn fans_out_to_all_targets_of_an_event() {
241        let (p, bus) = event_bus::<Ev>();
242        let mut a = bus.subscribe(1);
243        let mut b = bus.subscribe(2);
244        p.publish(ev(7, &[1, 2]));
245        assert_eq!(recv(&mut a), Some(7));
246        assert_eq!(recv(&mut b), Some(7));
247    }
248
249    #[test]
250    fn other_targets_are_not_delivered() {
251        let (p, bus) = event_bus::<Ev>();
252        let mut a = bus.subscribe(1);
253        p.publish(ev(1, &[2]));
254        assert_eq!(recv(&mut a), None);
255    }
256
257    #[test]
258    fn last_unsubscribe_removes_the_target() {
259        let (_p, bus) = event_bus::<Ev>();
260        let a = bus.subscribe(1);
261        let b = bus.subscribe(1);
262        assert_eq!(target_count(&bus), 1);
263        drop(a);
264        assert_eq!(target_count(&bus), 1); // b still watching
265        drop(b);
266        assert_eq!(target_count(&bus), 0); // eager removal
267    }
268
269    #[test]
270    fn overflowing_subscriber_is_pruned_on_publish() {
271        let (p, bus) = event_bus::<Ev>();
272        let _s = bus.subscribe(1); // held, never drained
273        for i in 0..(SUBSCRIBER_BUFFER as i32 + 8) {
274            p.publish(ev(i, &[1]));
275        }
276        assert_eq!(target_count(&bus), 0); // buffer filled -> pruned -> target dropped
277    }
278}