#[doc = include_str!("../README.md")]
use futures::channel::{
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
oneshot::{channel, Receiver, Sender},
};
use futures_lite::{Future, Stream};
use std::sync::mpsc::{channel as sync_channel, Receiver as SyncReceiver, Sender as SyncSender};
use std::{
collections::{HashMap, LinkedList},
hash::Hash,
marker::PhantomData,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
pub struct Subscriber<Event, Payload> {
sender_sender: SyncSender<UnboundedSender<Payload>>,
recv: UnboundedReceiver<Payload>,
_event: PhantomData<Event>,
}
impl<Event, Payload> Clone for Subscriber<Event, Payload> {
fn clone(&self) -> Self {
let (send, recv) = unbounded();
let sender_sender = self.sender_sender.clone();
sender_sender.send(send).unwrap();
Self {
sender_sender,
recv,
_event: PhantomData,
}
}
}
pub struct SubscriberOnce<Event, Payload> {
recv: Receiver<Payload>,
_event: PhantomData<Event>,
}
#[derive(Debug)]
struct InnerEmitterMultiple<Payload> {
send: UnboundedSender<Payload>,
}
#[derive(Debug)]
struct InnerEmitterOnce<Payload> {
send: Sender<Payload>,
}
#[derive(Debug)]
enum InnerEmitter<Payload> {
Multiple(InnerEmitterMultiple<Payload>),
Once(InnerEmitterOnce<Payload>),
}
#[derive(Debug)]
struct EmitterData<Payload> {
emitters: LinkedList<InnerEmitter<Payload>>,
sender_sender: SyncSender<UnboundedSender<Payload>>,
sender_recv: SyncReceiver<UnboundedSender<Payload>>,
}
impl<Payload> Default for EmitterData<Payload> {
fn default() -> Self {
let (send, recv) = sync_channel();
Self {
emitters: LinkedList::default(),
sender_sender: send,
sender_recv: recv,
}
}
}
pub struct EventManager<Event: Hash + Eq, Payload: Clone> {
emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
}
impl<Event: Hash + Eq, Payload: Clone> EventManager<Event, Payload> {
pub fn new() -> Self {
Self {
emitters: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn emitter(&mut self, event: Event) -> Emitter<Event, Payload> {
Emitter(self.emitters.lock().unwrap().entry(event).or_default().clone(), PhantomData)
}
pub fn emitter_any(&mut self) -> EmitterAny<Event, Payload> {
EmitterAny{emitters: self.emitters.clone()}
}
pub fn subscribe(&mut self, event: Event) -> Subscriber<Event, Payload> {
let (send, recv) = unbounded();
let emitter = InnerEmitter::Multiple(InnerEmitterMultiple { send });
let sender_sender = self.insert_emitter(event, emitter);
Subscriber {
sender_sender,
recv,
_event: PhantomData,
}
}
pub fn once(&mut self, event: Event) -> SubscriberOnce<Event, Payload> {
let (send, recv) = channel();
let emitter = InnerEmitter::Once(InnerEmitterOnce { send });
self.insert_emitter(event, emitter);
SubscriberOnce {
recv,
_event: PhantomData,
}
}
fn insert_emitter(
&mut self,
event: Event,
emitter: InnerEmitter<Payload>,
) -> SyncSender<UnboundedSender<Payload>> {
let mut lock = self.emitters.lock().unwrap();
let emitter_data_lock = lock.entry(event).or_default();
emitter_data_lock
.lock()
.unwrap()
.emitters
.push_back(emitter);
let sender = emitter_data_lock.lock().unwrap().sender_sender.clone();
sender
}
fn handle_cloned_subscribers(emitter_data_lock: &Arc<Mutex<EmitterData<Payload>>>) {
let mut emitter_data = emitter_data_lock.lock().unwrap();
let sender_iter = emitter_data.sender_recv.try_iter();
let emitters: Vec<_> = sender_iter
.into_iter()
.map(|send| InnerEmitter::Multiple(InnerEmitterMultiple { send }))
.collect();
emitter_data.emitters.extend(emitters);
}
pub fn emit(&mut self, event: Event, payload: Payload) {
if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
Self::handle_cloned_subscribers(emitter_data_lock);
let mut emitter_data = emitter_data_lock.lock().unwrap();
let list = &mut emitter_data.emitters;
EventManager::<Event, Payload>::list_emit(list, payload);
}
}
fn list_emit(list_guard: &mut LinkedList<InnerEmitter<Payload>>, payload: Payload) {
let list = std::mem::take(&mut *list_guard);
let list: LinkedList<InnerEmitter<Payload>> = list
.into_iter()
.filter_map(|emitter| {
match emitter {
InnerEmitter::Multiple(ref emitter) => {
if emitter.send.unbounded_send(payload.clone()).is_err() {
return None;
}
}
InnerEmitter::Once(emitter) => {
let _ = emitter.send.send(payload.clone());
return None;
}
}
Some(emitter)
})
.collect();
*list_guard = list;
}
}
#[derive(Debug, Clone)]
pub struct Emitter<Event, Payload>(Arc<Mutex<EmitterData<Payload>>>, PhantomData<Event>);
impl<Event: Hash + Eq, Payload: Clone> Emitter<Event, Payload> {
pub fn emit(&mut self, payload: Payload) {
EventManager::<Event, Payload>::handle_cloned_subscribers(&mut self.0);
EventManager::<Event, Payload>::list_emit(&mut self.0.lock().unwrap().emitters, payload)
}
}
pub struct EmitterAny<Event: Hash + Eq, Payload: Clone> {
emitters: Arc<Mutex<HashMap<Event, Arc<Mutex<EmitterData<Payload>>>>>>,
}
impl<Event: Hash + Eq, Payload: Clone> EmitterAny<Event, Payload> {
pub fn emit(&mut self, event: Event, payload: Payload) {
if let Some(emitter_data_lock) = self.emitters.lock().unwrap().get_mut(&event) {
EventManager::<Event, Payload>::handle_cloned_subscribers(emitter_data_lock);
let mut emitter_data = emitter_data_lock.lock().unwrap();
let list = &mut emitter_data.emitters;
EventManager::<Event, Payload>::list_emit(list, payload);
}
}
}
impl<Event, Payload> Stream for Subscriber<Event, Payload> {
type Item = Payload;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll_next(cx)
}
}
type Canceled = futures::channel::oneshot::Canceled;
impl<Event, Payload> Future for SubscriberOnce<Event, Payload> {
type Output = Result<Payload, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|thing| &mut thing.recv) }.poll(cx)
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct StringEvent(String);
impl<T: Into<String>> From<T> for StringEvent {
fn from(value: T) -> Self {
Self(Into::<String>::into(value))
}
}
#[cfg(test)]
pub mod tests {
use crate::{EventManager, StringEvent};
use futures_lite::StreamExt;
#[test]
fn test_events_once() {
let mut manager: EventManager<StringEvent, i32> = EventManager::new();
let subscriber = manager.once("Event".into());
manager.emit("Event".into(), 57);
assert_eq!(futures::executor::block_on(subscriber), Ok(57));
}
#[test]
fn test_events_any() {
let mut manager: EventManager<StringEvent, i32> = EventManager::new();
let mut emitter = manager.emitter_any();
let subscriber = manager.once("Event".into());
emitter.emit("Event".into(), 57);
assert_eq!(futures::executor::block_on(subscriber), Ok(57));
}
#[test]
fn test_events_multiple() {
let mut manager: EventManager<StringEvent, i32> = EventManager::new();
let mut subscriber = manager.subscribe("Event".into());
manager.emit("Event".into(), 57);
assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
drop(manager);
assert_eq!(futures::executor::block_on(subscriber.next()), None);
}
#[test]
fn test_clone_subscriber() {
let mut manager: EventManager<StringEvent, i32> = EventManager::new();
let mut subscriber = manager.subscribe("Event".into());
manager.emit("Event".into(), 57);
assert_eq!(futures::executor::block_on(subscriber.next()), Some(57));
let mut sub2 = subscriber.clone();
manager.emit("Event".into(), 58);
assert_eq!(futures::executor::block_on(subscriber.next()), Some(58));
assert_eq!(futures::executor::block_on(sub2.next()), Some(58));
drop(manager);
assert_eq!(futures::executor::block_on(subscriber.next()), None);
assert_eq!(futures::executor::block_on(sub2.next()), None);
}
}