1#[doc = include_str!("../README.md")]
2use futures::channel::{
3 mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
4 oneshot::{channel, Receiver, Sender},
5};
6use futures_lite::{Future, Stream};
7use std::sync::mpsc::{channel as sync_channel, Receiver as SyncReceiver, Sender as SyncSender};
8use std::{
9 collections::{HashMap, LinkedList},
10 hash::Hash,
11 marker::PhantomData,
12 pin::Pin,
13 sync::{Arc, Mutex},
14 task::{Context, Poll},
15};
16
17pub struct Subscriber<Event, Payload> {
35 sender_sender: SyncSender<UnboundedSender<Payload>>,
37 recv: UnboundedReceiver<Payload>,
38 _event: PhantomData<Event>,
39}
40impl<Event, Payload> Clone for Subscriber<Event, Payload> {
41 fn clone(&self) -> Self {
42 let (send, recv) = unbounded();
43 let sender_sender = self.sender_sender.clone();
48 sender_sender.send(send).unwrap();
49 Self {
50 sender_sender,
52 recv,
53 _event: PhantomData,
54 }
55 }
56}
57
58pub struct SubscriberOnce<Event, Payload> {
60 recv: Receiver<Payload>,
61 _event: PhantomData<Event>,
62}
63#[derive(Debug)]
64struct InnerEmitterMultiple<Payload> {
65 send: UnboundedSender<Payload>,
66}
67#[derive(Debug)]
68struct InnerEmitterOnce<Payload> {
69 send: Sender<Payload>,
70}
71#[derive(Debug)]
72enum InnerEmitter<Payload> {
73 Multiple(InnerEmitterMultiple<Payload>),
74 Once(InnerEmitterOnce<Payload>),
75}
76
77#[derive(Debug)]
78struct EmitterData<Payload> {
79 emitters: LinkedList<InnerEmitter<Payload>>,
80 sender_sender: SyncSender<UnboundedSender<Payload>>,
81 sender_recv: SyncReceiver<UnboundedSender<Payload>>,
82}
83impl<Payload> Default for EmitterData<Payload> {
84 fn default() -> Self {
85 let (send, recv) = sync_channel();
86 Self {
87 emitters: LinkedList::default(),
88 sender_sender: send,
89 sender_recv: recv,
90 }
91 }
92}
93
94pub struct EventManager<Event: Hash + Eq, Payload: Clone> {
103 emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
105}
106
107impl<Event: Hash + Eq, Payload: Clone> EventManager<Event, Payload> {
108 pub fn new() -> Self {
111 Self {
112 emitters: Arc::new(Mutex::new(HashMap::new())),
113 }
114 }
115 pub fn emitter(&mut self, event: Event) -> Emitter<Event, Payload> {
117 Emitter(self.emitters.lock().unwrap().entry(event).or_default().clone(), PhantomData)
118 }
119 pub fn emitter_any(&mut self) -> EmitterAny<Event, Payload> {
121 EmitterAny{emitters: self.emitters.clone()}
122 }
123 pub fn subscribe(&mut self, event: Event) -> Subscriber<Event, Payload> {
125 let (send, recv) = unbounded();
126 let emitter = InnerEmitter::Multiple(InnerEmitterMultiple { send });
127 let sender_sender = self.insert_emitter(event, emitter);
130 Subscriber {
131 sender_sender,
133 recv,
134 _event: PhantomData,
135 }
136 }
137 pub fn once(&mut self, event: Event) -> SubscriberOnce<Event, Payload> {
139 let (send, recv) = channel();
140 let emitter = InnerEmitter::Once(InnerEmitterOnce { send });
141 self.insert_emitter(event, emitter);
142 SubscriberOnce {
143 recv,
144 _event: PhantomData,
145 }
146 }
147 fn insert_emitter(
148 &mut self,
149 event: Event,
150 emitter: InnerEmitter<Payload>,
151 ) -> SyncSender<UnboundedSender<Payload>> {
152 let mut lock = self.emitters.lock().unwrap();
153 let emitter_data_lock = lock.entry(event).or_default();
154 emitter_data_lock
155 .lock()
156 .unwrap()
157 .emitters
158 .push_back(emitter);
159 let sender = emitter_data_lock.lock().unwrap().sender_sender.clone();
160 sender
161 }
171 fn handle_cloned_subscribers(emitter_data_lock: &Arc<Mutex<EmitterData<Payload>>>) {
172 let mut emitter_data = emitter_data_lock.lock().unwrap();
173 let sender_iter = emitter_data.sender_recv.try_iter();
174 let emitters: Vec<_> = sender_iter
175 .into_iter()
176 .map(|send| InnerEmitter::Multiple(InnerEmitterMultiple { send }))
177 .collect();
178 emitter_data.emitters.extend(emitters);
179 }
180 pub fn emit(&mut self, event: Event, payload: Payload) {
182 if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
183 Self::handle_cloned_subscribers(emitter_data_lock);
184 let mut emitter_data = emitter_data_lock.lock().unwrap();
185 let list = &mut emitter_data.emitters;
188 EventManager::<Event, Payload>::list_emit(list, payload);
189 }
190 }
191 fn list_emit(list_guard: &mut LinkedList<InnerEmitter<Payload>>, payload: Payload) {
192 let list = std::mem::take(&mut *list_guard);
195 let list: LinkedList<InnerEmitter<Payload>> = list
196 .into_iter()
197 .filter_map(|emitter| {
198 match emitter {
199 InnerEmitter::Multiple(ref emitter) => {
200 if emitter.send.unbounded_send(payload.clone()).is_err() {
201 return None;
202 }
203 }
204 InnerEmitter::Once(emitter) => {
205 let _ = emitter.send.send(payload.clone());
207 return None;
208 }
209 }
210 Some(emitter)
211 })
212 .collect();
213 *list_guard = list;
214 }
215}
216
217#[derive(Debug, Clone)]
232pub struct Emitter<Event, Payload>(Arc<Mutex<EmitterData<Payload>>>, PhantomData<Event>);
233
234impl<Event: Hash + Eq, Payload: Clone> Emitter<Event, Payload> {
235 pub fn emit(&mut self, payload: Payload) {
237 EventManager::<Event, Payload>::handle_cloned_subscribers(&mut self.0);
238 EventManager::<Event, Payload>::list_emit(&mut self.0.lock().unwrap().emitters, payload)
239 }
240}
241
242pub struct EmitterAny<Event: Hash + Eq, Payload: Clone> {
243 emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
245}
246
247impl<Event: Hash + Eq, Payload: Clone> EmitterAny<Event, Payload> {
248 pub fn emit(&mut self, event: Event, payload: Payload) {
250 if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
251 EventManager::<Event, Payload>::handle_cloned_subscribers(emitter_data_lock);
252 let mut emitter_data = emitter_data_lock.lock().unwrap();
253 let list = &mut emitter_data.emitters;
256 EventManager::<Event, Payload>::list_emit(list, payload);
257 }
258 }
259}
260
261impl<Event, Payload> Stream for Subscriber<Event, Payload> {
262 type Item = Payload;
263 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
264 unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll_next(cx)
266 }
267}
268
269type Canceled = futures::channel::oneshot::Canceled;
270impl<Event, Payload> Future for SubscriberOnce<Event, Payload> {
271 type Output = Result<Payload, Canceled>;
272
273 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274 unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll(cx)
275 }
278}
279
280#[derive(Debug, PartialEq, Eq, Hash)]
281pub struct StringEvent(String);
283
284impl<T: Into<String>> From<T> for StringEvent {
285 fn from(value: T) -> Self {
286 Self(Into::<String>::into(value))
287 }
288}
289
290#[cfg(test)]
291pub mod tests {
292 use crate::{EventManager, StringEvent};
293 use futures_lite::StreamExt;
294
295 #[test]
296 fn test_events_once() {
297 let mut manager: EventManager<StringEvent, i32> = EventManager::new();
298 let subscriber = manager.once("Event".into());
299 manager.emit("Event".into(), 57);
300 assert_eq!(futures::executor::block_on(subscriber), Ok(57));
301 }
302
303 #[test]
304 fn test_events_any() {
305 let mut manager: EventManager<StringEvent, i32> = EventManager::new();
306 let mut emitter = manager.emitter_any();
307 let subscriber = manager.once("Event".into());
308 emitter.emit("Event".into(), 57);
309 assert_eq!(futures::executor::block_on(subscriber), Ok(57));
310 }
311
312 #[test]
313 fn test_events_multiple() {
314 let mut manager: EventManager<StringEvent, i32> = EventManager::new();
315 let mut subscriber = manager.subscribe("Event".into());
316 manager.emit("Event".into(), 57);
317 assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
318 drop(manager);
319 assert_eq!(futures::executor::block_on(subscriber.next()), None);
320 }
321 #[test]
326 fn test_clone_subscriber() {
327 let mut manager: EventManager<StringEvent, i32> = EventManager::new();
328 let mut subscriber = manager.subscribe("Event".into());
329 manager.emit("Event".into(), 57);
330 assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
331 let mut sub2 = subscriber.clone();
332 manager.emit("Event".into(), 58);
333 assert_eq!(futures::executor::block_on(subscriber.next()), Some(58));
334 assert_eq!(futures::executor::block_on(sub2.next()), Some(58));
335
336 drop(manager);
337 assert_eq!(futures::executor::block_on(subscriber.next()), None);
338 assert_eq!(futures::executor::block_on(sub2.next()), None);
339 }
340}