hash_events 0.1.2

An MPMC event system that allows for custom events. Events are emitted with a payload, and subscribers to that event are stream of payloads. For each event, all subscribers are triggered.
Documentation
#[doc = include_str!("../README.md")]
use futures::channel::{
    mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
    oneshot::{channel, Receiver, Sender},
};
use futures_lite::{Future, Stream};
use std::sync::mpsc::{channel as sync_channel, Receiver as SyncReceiver, Sender as SyncSender};
use std::{
    collections::{HashMap, LinkedList},
    hash::Hash,
    marker::PhantomData,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll},
};

// type EmitterList<Payload> = Arc<Mutex<LinkedList<InnerEmitter<Payload>>>>;
/// A stream to receive events. Any event emitted by an Emitter or EventManager will
/// trigger all matching subscribers. Subscribers can be obtained from EventManagers or by cloning
/// another subscriber.
/// ```
/// # use hash_events::{EventManager, StringEvent};
/// # use futures::executor::block_on;
/// # use futures_lite::StreamExt;
/// let mut manager: EventManager<StringEvent, i32> = EventManager::new();
/// let mut sub1 = manager.subscribe("Event".into());
/// let mut sub2 = sub1.clone();
/// manager.emit("Event".into(), 57);
/// assert_eq!(block_on(sub1.next()), Some(57));
/// assert_eq!(block_on(sub2.next()), Some(57));
/// drop(manager);
/// assert_eq!(block_on(sub2.next()), None);
/// ```
pub struct Subscriber<Event, Payload> {
    // list: EmitterList<Payload>,
    sender_sender: SyncSender<UnboundedSender<Payload>>,
    recv: UnboundedReceiver<Payload>,
    _event: PhantomData<Event>,
}
impl<Event, Payload> Clone for Subscriber<Event, Payload> {
    fn clone(&self) -> Self {
        let (send, recv) = unbounded();
        // let emitter = InnerEmitter::Multiple(InnerEmitterMultiple { send });
        // Instead of containing a list, send the new sender through a channel
        // and check for new senders before emitting events
        // self.list.lock().unwrap().push_back(emitter);
        let sender_sender = self.sender_sender.clone();
        sender_sender.send(send).unwrap();
        Self {
            // list: self.list.clone(),
            sender_sender,
            recv,
            _event: PhantomData,
        }
    }
}

/// A subscriber that will only trigger once.
pub struct SubscriberOnce<Event, Payload> {
    recv: Receiver<Payload>,
    _event: PhantomData<Event>,
}
#[derive(Debug)]
struct InnerEmitterMultiple<Payload> {
    send: UnboundedSender<Payload>,
}
#[derive(Debug)]
struct InnerEmitterOnce<Payload> {
    send: Sender<Payload>,
}
#[derive(Debug)]
enum InnerEmitter<Payload> {
    Multiple(InnerEmitterMultiple<Payload>),
    Once(InnerEmitterOnce<Payload>),
}

#[derive(Debug)]
struct EmitterData<Payload> {
    emitters: LinkedList<InnerEmitter<Payload>>,
    sender_sender: SyncSender<UnboundedSender<Payload>>,
    sender_recv: SyncReceiver<UnboundedSender<Payload>>,
}
impl<Payload> Default for EmitterData<Payload> {
    fn default() -> Self {
        let (send, recv) = sync_channel();
        Self {
            emitters: LinkedList::default(),
            sender_sender: send,
            sender_recv: recv,
        }
    }
}

/// EventManager is used to get subscribers and emitters, but can also emit events itself
/// ```
/// # use hash_events::{EventManager, StringEvent};
/// let mut manager: EventManager<StringEvent, i32> = EventManager::new();
/// let subscriber = manager.once("Event".into());
/// manager.emit("Event".into(), 57);
/// assert_eq!(futures::executor::block_on(subscriber), Ok(57));
/// ```
pub struct EventManager<Event: Hash + Eq, Payload: Clone> {
    // needs a list of subscribers
    emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
}

impl<Event: Hash + Eq, Payload: Clone> EventManager<Event, Payload> {
    /// Create the EventManager, which can send any event, or create emitters and subscribers for a
    /// single event.
    pub fn new() -> Self {
        Self {
            emitters: Arc::new(Mutex::new(HashMap::new())),
        }
    }
    /// Get an Emitter that sends the specified event
    pub fn emitter(&mut self, event: Event) -> Emitter<Event, Payload> {
        Emitter(self.emitters.lock().unwrap().entry(event).or_default().clone(), PhantomData)
    }
    /// Get an Emitter that can send any event
    pub fn emitter_any(&mut self) -> EmitterAny<Event, Payload> {
        EmitterAny{emitters: self.emitters.clone()}
    }
    /// Create a subscriber that will trigger on each of the specified event
    pub fn subscribe(&mut self, event: Event) -> Subscriber<Event, Payload> {
        let (send, recv) = unbounded();
        let emitter = InnerEmitter::Multiple(InnerEmitterMultiple { send });
        // let emitter_data_lock = self.emitters.entry(event.clone()).or_default();

        let sender_sender = self.insert_emitter(event, emitter);
        Subscriber {
            // list: list_clone,
            sender_sender,
            recv,
            _event: PhantomData,
        }
    }
    /// Create a subscriber that will trigger only once
    pub fn once(&mut self, event: Event) -> SubscriberOnce<Event, Payload> {
        let (send, recv) = channel();
        let emitter = InnerEmitter::Once(InnerEmitterOnce { send });
        self.insert_emitter(event, emitter);
        SubscriberOnce {
            recv,
            _event: PhantomData,
        }
    }
    fn insert_emitter(
        &mut self,
        event: Event,
        emitter: InnerEmitter<Payload>,
    ) -> SyncSender<UnboundedSender<Payload>> {
        let mut lock = self.emitters.lock().unwrap();
        let emitter_data_lock = lock.entry(event).or_default();
        emitter_data_lock
            .lock()
            .unwrap()
            .emitters
            .push_back(emitter);
        let sender = emitter_data_lock.lock().unwrap().sender_sender.clone();
        sender
        // list.clone()
        // match self.emitters.get_mut(&event) {
        //     Some(list) => list.lock().unwrap().push_back(emitter),
        //     None => {
        //         let mut list = LinkedList::new();
        //         list.push_back(emitter);
        //         self.emitters.insert(event, Arc::new(Mutex::new(list)));
        //     }
        // }
    }
    fn handle_cloned_subscribers(emitter_data_lock: &Arc<Mutex<EmitterData<Payload>>>) {
        let mut emitter_data = emitter_data_lock.lock().unwrap();
        let sender_iter = emitter_data.sender_recv.try_iter();
        let emitters: Vec<_> = sender_iter
            .into_iter()
            .map(|send| InnerEmitter::Multiple(InnerEmitterMultiple { send }))
            .collect();
        emitter_data.emitters.extend(emitters);
    }
    /// Emit an event with the specified Payload
    pub fn emit(&mut self, event: Event, payload: Payload) {
        if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
            Self::handle_cloned_subscribers(emitter_data_lock);
            let mut emitter_data = emitter_data_lock.lock().unwrap();
            // drop(sender_iter);
            // emitter_data.emitters.push_back(emitter);
            let list = &mut emitter_data.emitters;
            EventManager::<Event, Payload>::list_emit(list, payload);
        }
    }
    fn list_emit(list_guard: &mut LinkedList<InnerEmitter<Payload>>, payload: Payload) {
        // let mut list_guard: MutexGuard<'_, LinkedList<InnerEmitter<Payload>>> =
        // list.lock().unwrap();
        let list = std::mem::take(&mut *list_guard);
        let list: LinkedList<InnerEmitter<Payload>> = list
            .into_iter()
            .filter_map(|emitter| {
                match emitter {
                    InnerEmitter::Multiple(ref emitter) => {
                        if emitter.send.unbounded_send(payload.clone()).is_err() {
                            return None;
                        }
                    }
                    InnerEmitter::Once(emitter) => {
                        // The recv in a subscriber has been dropped, this is fine
                        let _ = emitter.send.send(payload.clone());
                        return None;
                    }
                }
                Some(emitter)
            })
            .collect();
        *list_guard = list;
    }
}

/// An emitter which can only send the event used to create it
/// ```
/// # use hash_events::{EventManager, StringEvent};
/// # use futures::executor::block_on;
/// # use futures_lite::StreamExt;
/// let mut manager: EventManager<StringEvent, i32> = EventManager::new();
/// let mut subscriber = manager.subscribe("Event".into());
/// let mut emitter = manager.emitter("Event".into());
/// drop(manager);
/// emitter.emit(57);
/// assert_eq!(block_on(subscriber.next()), Some(57));
/// drop(emitter);
/// assert_eq!(block_on(subscriber.next()), None);
/// ```
#[derive(Debug, Clone)]
pub struct Emitter<Event, Payload>(Arc<Mutex<EmitterData<Payload>>>, PhantomData<Event>);

impl<Event: Hash + Eq, Payload: Clone> Emitter<Event, Payload> {
    /// Emit the event used in the Emitter's creation, with the specified Payload
    pub fn emit(&mut self, payload: Payload) {
        EventManager::<Event, Payload>::handle_cloned_subscribers(&mut self.0);
        EventManager::<Event, Payload>::list_emit(&mut self.0.lock().unwrap().emitters, payload)
    }
}

pub struct EmitterAny<Event: Hash + Eq, Payload: Clone> {
    // needs a list of subscribers
    emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
}

impl<Event: Hash + Eq, Payload: Clone> EmitterAny<Event, Payload> {
    /// Emit an event with any payload
    pub fn emit(&mut self, event: Event, payload: Payload) {
        if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
            EventManager::<Event, Payload>::handle_cloned_subscribers(emitter_data_lock);
            let mut emitter_data = emitter_data_lock.lock().unwrap();
            // drop(sender_iter);
            // emitter_data.emitters.push_back(emitter);
            let list = &mut emitter_data.emitters;
            EventManager::<Event, Payload>::list_emit(list, payload);
        }
    }
}

impl<Event, Payload> Stream for Subscriber<Event, Payload> {
    type Item = Payload;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // self.recv.poll_next_unpin(cx)
        unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll_next(cx)
    }
}

type Canceled = futures::channel::oneshot::Canceled;
impl<Event, Payload> Future for SubscriberOnce<Event, Payload> {
    type Output = Result<Payload, Canceled>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll(cx)
        // self.recv.poll_unpin(cx)
        // self.recv.poll(cx)
    }
}

#[derive(Debug, PartialEq, Eq, Hash)]
/// An event for any type that can be converted into a string.
pub struct StringEvent(String);

impl<T: Into<String>> From<T> for StringEvent {
    fn from(value: T) -> Self {
        Self(Into::<String>::into(value))
    }
}

#[cfg(test)]
pub mod tests {
    use crate::{EventManager, StringEvent};
    use futures_lite::StreamExt;

    #[test]
    fn test_events_once() {
        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
        let subscriber = manager.once("Event".into());
        manager.emit("Event".into(), 57);
        assert_eq!(futures::executor::block_on(subscriber), Ok(57));
    }

    #[test]
    fn test_events_any() {
        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
        let mut emitter = manager.emitter_any();
        let subscriber = manager.once("Event".into());
        emitter.emit("Event".into(), 57);
        assert_eq!(futures::executor::block_on(subscriber), Ok(57));
    }

    #[test]
    fn test_events_multiple() {
        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
        let mut subscriber = manager.subscribe("Event".into());
        manager.emit("Event".into(), 57);
        assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
        drop(manager);
        assert_eq!(futures::executor::block_on(subscriber.next()), None);
    }
    // fn inc(n: &mut i32) -> i32 {
    //     *n = *n + 1;
    //     *n
    // }
    #[test]
    fn test_clone_subscriber() {
        let mut manager: EventManager<StringEvent, i32> = EventManager::new();
        let mut subscriber = manager.subscribe("Event".into());
        manager.emit("Event".into(), 57);
        assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
        let mut sub2 = subscriber.clone();
        manager.emit("Event".into(), 58);
        assert_eq!(futures::executor::block_on(subscriber.next()), Some(58));
        assert_eq!(futures::executor::block_on(sub2.next()), Some(58));

        drop(manager);
        assert_eq!(futures::executor::block_on(subscriber.next()), None);
        assert_eq!(futures::executor::block_on(sub2.next()), None);
    }
}