hash_events/
lib.rs

1#[doc = include_str!("../README.md")]
2use futures::channel::{
3    mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
4    oneshot::{channel, Receiver, Sender},
5};
6use futures_lite::{Future, Stream};
7use std::sync::mpsc::{channel as sync_channel, Receiver as SyncReceiver, Sender as SyncSender};
8use std::{
9    collections::{HashMap, LinkedList},
10    hash::Hash,
11    marker::PhantomData,
12    pin::Pin,
13    sync::{Arc, Mutex},
14    task::{Context, Poll},
15};
16
17// type EmitterList<Payload> = Arc<Mutex<LinkedList<InnerEmitter<Payload>>>>;
18/// A stream to receive events. Any event emitted by an Emitter or EventManager will
19/// trigger all matching subscribers. Subscribers can be obtained from EventManagers or by cloning
20/// another subscriber.
21/// ```
22/// # use hash_events::{EventManager, StringEvent};
23/// # use futures::executor::block_on;
24/// # use futures_lite::StreamExt;
25/// let mut manager: EventManager<StringEvent, i32> = EventManager::new();
26/// let mut sub1 = manager.subscribe("Event".into());
27/// let mut sub2 = sub1.clone();
28/// manager.emit("Event".into(), 57);
29/// assert_eq!(block_on(sub1.next()), Some(57));
30/// assert_eq!(block_on(sub2.next()), Some(57));
31/// drop(manager);
32/// assert_eq!(block_on(sub2.next()), None);
33/// ```
34pub struct Subscriber<Event, Payload> {
35    // list: EmitterList<Payload>,
36    sender_sender: SyncSender<UnboundedSender<Payload>>,
37    recv: UnboundedReceiver<Payload>,
38    _event: PhantomData<Event>,
39}
40impl<Event, Payload> Clone for Subscriber<Event, Payload> {
41    fn clone(&self) -> Self {
42        let (send, recv) = unbounded();
43        // let emitter = InnerEmitter::Multiple(InnerEmitterMultiple { send });
44        // Instead of containing a list, send the new sender through a channel
45        // and check for new senders before emitting events
46        // self.list.lock().unwrap().push_back(emitter);
47        let sender_sender = self.sender_sender.clone();
48        sender_sender.send(send).unwrap();
49        Self {
50            // list: self.list.clone(),
51            sender_sender,
52            recv,
53            _event: PhantomData,
54        }
55    }
56}
57
58/// A subscriber that will only trigger once.
59pub struct SubscriberOnce<Event, Payload> {
60    recv: Receiver<Payload>,
61    _event: PhantomData<Event>,
62}
63#[derive(Debug)]
64struct InnerEmitterMultiple<Payload> {
65    send: UnboundedSender<Payload>,
66}
67#[derive(Debug)]
68struct InnerEmitterOnce<Payload> {
69    send: Sender<Payload>,
70}
71#[derive(Debug)]
72enum InnerEmitter<Payload> {
73    Multiple(InnerEmitterMultiple<Payload>),
74    Once(InnerEmitterOnce<Payload>),
75}
76
77#[derive(Debug)]
78struct EmitterData<Payload> {
79    emitters: LinkedList<InnerEmitter<Payload>>,
80    sender_sender: SyncSender<UnboundedSender<Payload>>,
81    sender_recv: SyncReceiver<UnboundedSender<Payload>>,
82}
83impl<Payload> Default for EmitterData<Payload> {
84    fn default() -> Self {
85        let (send, recv) = sync_channel();
86        Self {
87            emitters: LinkedList::default(),
88            sender_sender: send,
89            sender_recv: recv,
90        }
91    }
92}
93
94/// EventManager is used to get subscribers and emitters, but can also emit events itself
95/// ```
96/// # use hash_events::{EventManager, StringEvent};
97/// let mut manager: EventManager<StringEvent, i32> = EventManager::new();
98/// let subscriber = manager.once("Event".into());
99/// manager.emit("Event".into(), 57);
100/// assert_eq!(futures::executor::block_on(subscriber), Ok(57));
101/// ```
102pub struct EventManager<Event: Hash + Eq, Payload: Clone> {
103    // needs a list of subscribers
104    emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
105}
106
107impl<Event: Hash + Eq, Payload: Clone> EventManager<Event, Payload> {
108    /// Create the EventManager, which can send any event, or create emitters and subscribers for a
109    /// single event.
110    pub fn new() -> Self {
111        Self {
112            emitters: Arc::new(Mutex::new(HashMap::new())),
113        }
114    }
115    /// Get an Emitter that sends the specified event
116    pub fn emitter(&mut self, event: Event) -> Emitter<Event, Payload> {
117        Emitter(self.emitters.lock().unwrap().entry(event).or_default().clone(), PhantomData)
118    }
119    /// Get an Emitter that can send any event
120    pub fn emitter_any(&mut self) -> EmitterAny<Event, Payload> {
121        EmitterAny{emitters: self.emitters.clone()}
122    }
123    /// Create a subscriber that will trigger on each of the specified event
124    pub fn subscribe(&mut self, event: Event) -> Subscriber<Event, Payload> {
125        let (send, recv) = unbounded();
126        let emitter = InnerEmitter::Multiple(InnerEmitterMultiple { send });
127        // let emitter_data_lock = self.emitters.entry(event.clone()).or_default();
128
129        let sender_sender = self.insert_emitter(event, emitter);
130        Subscriber {
131            // list: list_clone,
132            sender_sender,
133            recv,
134            _event: PhantomData,
135        }
136    }
137    /// Create a subscriber that will trigger only once
138    pub fn once(&mut self, event: Event) -> SubscriberOnce<Event, Payload> {
139        let (send, recv) = channel();
140        let emitter = InnerEmitter::Once(InnerEmitterOnce { send });
141        self.insert_emitter(event, emitter);
142        SubscriberOnce {
143            recv,
144            _event: PhantomData,
145        }
146    }
147    fn insert_emitter(
148        &mut self,
149        event: Event,
150        emitter: InnerEmitter<Payload>,
151    ) -> SyncSender<UnboundedSender<Payload>> {
152        let mut lock = self.emitters.lock().unwrap();
153        let emitter_data_lock = lock.entry(event).or_default();
154        emitter_data_lock
155            .lock()
156            .unwrap()
157            .emitters
158            .push_back(emitter);
159        let sender = emitter_data_lock.lock().unwrap().sender_sender.clone();
160        sender
161        // list.clone()
162        // match self.emitters.get_mut(&event) {
163        //     Some(list) => list.lock().unwrap().push_back(emitter),
164        //     None => {
165        //         let mut list = LinkedList::new();
166        //         list.push_back(emitter);
167        //         self.emitters.insert(event, Arc::new(Mutex::new(list)));
168        //     }
169        // }
170    }
171    fn handle_cloned_subscribers(emitter_data_lock: &Arc<Mutex<EmitterData<Payload>>>) {
172        let mut emitter_data = emitter_data_lock.lock().unwrap();
173        let sender_iter = emitter_data.sender_recv.try_iter();
174        let emitters: Vec<_> = sender_iter
175            .into_iter()
176            .map(|send| InnerEmitter::Multiple(InnerEmitterMultiple { send }))
177            .collect();
178        emitter_data.emitters.extend(emitters);
179    }
180    /// Emit an event with the specified Payload
181    pub fn emit(&mut self, event: Event, payload: Payload) {
182        if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
183            Self::handle_cloned_subscribers(emitter_data_lock);
184            let mut emitter_data = emitter_data_lock.lock().unwrap();
185            // drop(sender_iter);
186            // emitter_data.emitters.push_back(emitter);
187            let list = &mut emitter_data.emitters;
188            EventManager::<Event, Payload>::list_emit(list, payload);
189        }
190    }
191    fn list_emit(list_guard: &mut LinkedList<InnerEmitter<Payload>>, payload: Payload) {
192        // let mut list_guard: MutexGuard<'_, LinkedList<InnerEmitter<Payload>>> =
193        // list.lock().unwrap();
194        let list = std::mem::take(&mut *list_guard);
195        let list: LinkedList<InnerEmitter<Payload>> = list
196            .into_iter()
197            .filter_map(|emitter| {
198                match emitter {
199                    InnerEmitter::Multiple(ref emitter) => {
200                        if emitter.send.unbounded_send(payload.clone()).is_err() {
201                            return None;
202                        }
203                    }
204                    InnerEmitter::Once(emitter) => {
205                        // The recv in a subscriber has been dropped, this is fine
206                        let _ = emitter.send.send(payload.clone());
207                        return None;
208                    }
209                }
210                Some(emitter)
211            })
212            .collect();
213        *list_guard = list;
214    }
215}
216
217/// An emitter which can only send the event used to create it
218/// ```
219/// # use hash_events::{EventManager, StringEvent};
220/// # use futures::executor::block_on;
221/// # use futures_lite::StreamExt;
222/// let mut manager: EventManager<StringEvent, i32> = EventManager::new();
223/// let mut subscriber = manager.subscribe("Event".into());
224/// let mut emitter = manager.emitter("Event".into());
225/// drop(manager);
226/// emitter.emit(57);
227/// assert_eq!(block_on(subscriber.next()), Some(57));
228/// drop(emitter);
229/// assert_eq!(block_on(subscriber.next()), None);
230/// ```
231#[derive(Debug, Clone)]
232pub struct Emitter<Event, Payload>(Arc<Mutex<EmitterData<Payload>>>, PhantomData<Event>);
233
234impl<Event: Hash + Eq, Payload: Clone> Emitter<Event, Payload> {
235    /// Emit the event used in the Emitter's creation, with the specified Payload
236    pub fn emit(&mut self, payload: Payload) {
237        EventManager::<Event, Payload>::handle_cloned_subscribers(&mut self.0);
238        EventManager::<Event, Payload>::list_emit(&mut self.0.lock().unwrap().emitters, payload)
239    }
240}
241
242pub struct EmitterAny<Event: Hash + Eq, Payload: Clone> {
243    // needs a list of subscribers
244    emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
245}
246
247impl<Event: Hash + Eq, Payload: Clone> EmitterAny<Event, Payload> {
248    /// Emit an event with any payload
249    pub fn emit(&mut self, event: Event, payload: Payload) {
250        if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
251            EventManager::<Event, Payload>::handle_cloned_subscribers(emitter_data_lock);
252            let mut emitter_data = emitter_data_lock.lock().unwrap();
253            // drop(sender_iter);
254            // emitter_data.emitters.push_back(emitter);
255            let list = &mut emitter_data.emitters;
256            EventManager::<Event, Payload>::list_emit(list, payload);
257        }
258    }
259}
260
261impl<Event, Payload> Stream for Subscriber<Event, Payload> {
262    type Item = Payload;
263    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
264        // self.recv.poll_next_unpin(cx)
265        unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll_next(cx)
266    }
267}
268
269type Canceled = futures::channel::oneshot::Canceled;
270impl<Event, Payload> Future for SubscriberOnce<Event, Payload> {
271    type Output = Result<Payload, Canceled>;
272
273    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274        unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll(cx)
275        // self.recv.poll_unpin(cx)
276        // self.recv.poll(cx)
277    }
278}
279
280#[derive(Debug, PartialEq, Eq, Hash)]
281/// An event for any type that can be converted into a string.
282pub struct StringEvent(String);
283
284impl<T: Into<String>> From<T> for StringEvent {
285    fn from(value: T) -> Self {
286        Self(Into::<String>::into(value))
287    }
288}
289
290#[cfg(test)]
291pub mod tests {
292    use crate::{EventManager, StringEvent};
293    use futures_lite::StreamExt;
294
295    #[test]
296    fn test_events_once() {
297        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
298        let subscriber = manager.once("Event".into());
299        manager.emit("Event".into(), 57);
300        assert_eq!(futures::executor::block_on(subscriber), Ok(57));
301    }
302
303    #[test]
304    fn test_events_any() {
305        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
306        let mut emitter = manager.emitter_any();
307        let subscriber = manager.once("Event".into());
308        emitter.emit("Event".into(), 57);
309        assert_eq!(futures::executor::block_on(subscriber), Ok(57));
310    }
311
312    #[test]
313    fn test_events_multiple() {
314        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
315        let mut subscriber = manager.subscribe("Event".into());
316        manager.emit("Event".into(), 57);
317        assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
318        drop(manager);
319        assert_eq!(futures::executor::block_on(subscriber.next()), None);
320    }
321    // fn inc(n: &mut i32) -> i32 {
322    //     *n = *n + 1;
323    //     *n
324    // }
325    #[test]
326    fn test_clone_subscriber() {
327        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
328        let mut subscriber = manager.subscribe("Event".into());
329        manager.emit("Event".into(), 57);
330        assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
331        let mut sub2 = subscriber.clone();
332        manager.emit("Event".into(), 58);
333        assert_eq!(futures::executor::block_on(subscriber.next()), Some(58));
334        assert_eq!(futures::executor::block_on(sub2.next()), Some(58));
335
336        drop(manager);
337        assert_eq!(futures::executor::block_on(subscriber.next()), None);
338        assert_eq!(futures::executor::block_on(sub2.next()), None);
339    }
340}