1use std::collections::HashMap;
11use std::collections::hash_map::Entry;
12use std::hash::Hash;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex, MutexGuard};
15use std::task::{Context, Poll};
16
17use futures_channel::mpsc;
18use futures_core::Stream;
19
20pub trait RoutableEvent: Clone {
22 type Target: Eq + Hash + Clone;
24
25 fn targets(&self) -> impl Iterator<Item = Self::Target>;
28}
29
30pub trait Publisher<E> {
35 fn publish(&self, event: E);
37}
38
39pub struct NoopPublisher;
42
43impl<E> Publisher<E> for NoopPublisher {
44 fn publish(&self, _event: E) {}
45}
46
47pub trait EventHandler<E> {
53 type Publish: Publisher<E>;
55
56 fn init() -> (Self::Publish, Self);
58}
59
60pub struct NoopHandler;
63
64impl<E> EventHandler<E> for NoopHandler {
65 type Publish = NoopPublisher;
66 fn init() -> (Self::Publish, Self) {
67 (NoopPublisher, NoopHandler)
68 }
69}
70
71const SUBSCRIBER_BUFFER: usize = 256;
73
74type Subscribers<T> = HashMap<u64, mpsc::Sender<T>>;
76
77struct Inner<T: RoutableEvent> {
78 targets: HashMap<T::Target, Subscribers<T>>,
79 next_id: u64,
80}
81
82pub struct EventBusPublisher<T: RoutableEvent> {
84 inner: Arc<Mutex<Inner<T>>>,
85}
86
87pub struct EventBus<T: RoutableEvent> {
89 inner: Arc<Mutex<Inner<T>>>,
90}
91
92impl<T: RoutableEvent> Clone for EventBus<T> {
93 fn clone(&self) -> Self {
94 Self {
95 inner: Arc::clone(&self.inner),
96 }
97 }
98}
99
100pub fn event_bus<T: RoutableEvent>() -> (EventBusPublisher<T>, EventBus<T>) {
102 let inner = Arc::new(Mutex::new(Inner {
103 targets: HashMap::new(),
104 next_id: 0,
105 }));
106 (
107 EventBusPublisher {
108 inner: Arc::clone(&inner),
109 },
110 EventBus { inner },
111 )
112}
113
114impl<T: RoutableEvent> EventHandler<T> for EventBus<T> {
115 type Publish = EventBusPublisher<T>;
116 fn init() -> (Self::Publish, Self) {
117 event_bus()
118 }
119}
120
121impl<T: RoutableEvent> Publisher<T> for EventBusPublisher<T> {
122 fn publish(&self, event: T) {
125 let mut inner = lock(&self.inner);
126 for target in event.targets() {
127 if let Entry::Occupied(mut e) = inner.targets.entry(target) {
128 e.get_mut()
129 .retain(|_, tx| tx.try_send(event.clone()).is_ok());
130 if e.get().is_empty() {
131 e.remove();
132 }
133 }
134 }
135 }
136}
137
138impl<T: RoutableEvent> EventBus<T> {
139 pub fn subscribe(&self, target: T::Target) -> Subscription<T> {
141 let (tx, rx) = mpsc::channel(SUBSCRIBER_BUFFER);
142 let mut inner = lock(&self.inner);
143 let id = inner.next_id;
144 inner.next_id += 1;
145 inner
146 .targets
147 .entry(target.clone())
148 .or_default()
149 .insert(id, tx);
150 Subscription {
151 inner: Arc::clone(&self.inner),
152 target,
153 id,
154 rx,
155 }
156 }
157}
158
159pub struct Subscription<T: RoutableEvent> {
162 inner: Arc<Mutex<Inner<T>>>,
163 target: T::Target,
164 id: u64,
165 rx: mpsc::Receiver<T>,
166}
167
168impl<T: RoutableEvent> Stream for Subscription<T>
169where
170 T::Target: Unpin,
171{
172 type Item = T;
173
174 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
175 Pin::new(&mut self.get_mut().rx).poll_next(cx)
176 }
177}
178
179impl<T: RoutableEvent> Drop for Subscription<T> {
180 fn drop(&mut self) {
181 let mut inner = lock(&self.inner);
182 if let Entry::Occupied(mut e) = inner.targets.entry(self.target.clone()) {
183 e.get_mut().remove(&self.id);
184 if e.get().is_empty() {
185 e.remove();
186 }
187 }
188 }
189}
190
191fn lock<G>(m: &Mutex<G>) -> MutexGuard<'_, G> {
196 m.lock().unwrap_or_else(|e| e.into_inner())
197}
198
199#[cfg(test)]
200mod tests {
201 use futures::{FutureExt, StreamExt};
202
203 use super::*;
204
205 #[derive(Clone)]
206 struct Ev {
207 n: i32,
208 to: Vec<u32>,
209 }
210
211 impl RoutableEvent for Ev {
212 type Target = u32;
213 fn targets(&self) -> impl Iterator<Item = u32> {
214 self.to.clone().into_iter()
215 }
216 }
217
218 fn ev(n: i32, to: &[u32]) -> Ev {
219 Ev { n, to: to.to_vec() }
220 }
221
222 fn recv(sub: &mut Subscription<Ev>) -> Option<i32> {
224 sub.next().now_or_never().flatten().map(|e| e.n)
225 }
226
227 fn target_count(bus: &EventBus<Ev>) -> usize {
228 lock(&bus.inner).targets.len()
229 }
230
231 #[test]
232 fn delivers_to_subscriber() {
233 let (p, bus) = event_bus::<Ev>();
234 let mut s = bus.subscribe(1);
235 p.publish(ev(10, &[1]));
236 assert_eq!(recv(&mut s), Some(10));
237 }
238
239 #[test]
240 fn fans_out_to_all_targets_of_an_event() {
241 let (p, bus) = event_bus::<Ev>();
242 let mut a = bus.subscribe(1);
243 let mut b = bus.subscribe(2);
244 p.publish(ev(7, &[1, 2]));
245 assert_eq!(recv(&mut a), Some(7));
246 assert_eq!(recv(&mut b), Some(7));
247 }
248
249 #[test]
250 fn other_targets_are_not_delivered() {
251 let (p, bus) = event_bus::<Ev>();
252 let mut a = bus.subscribe(1);
253 p.publish(ev(1, &[2]));
254 assert_eq!(recv(&mut a), None);
255 }
256
257 #[test]
258 fn last_unsubscribe_removes_the_target() {
259 let (_p, bus) = event_bus::<Ev>();
260 let a = bus.subscribe(1);
261 let b = bus.subscribe(1);
262 assert_eq!(target_count(&bus), 1);
263 drop(a);
264 assert_eq!(target_count(&bus), 1); drop(b);
266 assert_eq!(target_count(&bus), 0); }
268
269 #[test]
270 fn overflowing_subscriber_is_pruned_on_publish() {
271 let (p, bus) = event_bus::<Ev>();
272 let _s = bus.subscribe(1); for i in 0..(SUBSCRIBER_BUFFER as i32 + 8) {
274 p.publish(ev(i, &[1]));
275 }
276 assert_eq!(target_count(&bus), 0); }
278}