use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::rc::{Rc, Weak};
use derivative::Derivative;
use futures::channel::oneshot::*;
use yew::prelude::*;
#[derive(Derivative)]
#[derivative(Default(bound = ""))]
struct IndexedSet<T> {
set: HashMap<usize, T>,
gen_: usize,
}
impl<T> IndexedSet<T> {
fn insert(&mut self, v: T) -> usize {
let key = self.gen_;
if self.set.insert(key, v).is_some() {
tracing::warn!("Collision");
};
self.gen_ += 1;
key
}
fn remove(&mut self, key: usize) {
self.set.remove(&key);
}
fn iter(&self) -> impl Iterator<Item = &T> {
self.set.values()
}
fn drain(&mut self) -> impl Iterator<Item = T> {
let mut x = Box::default();
std::mem::swap(&mut self.set, &mut x);
x.into_values()
}
}
type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
#[derive(Derivative)]
#[derivative(Default(bound = ""))]
struct PubSubInternal<T: Clone> {
deleted: Cell<bool>,
listeners: RefCell<ListenerSet<T>>,
once_listeners: RefCell<ListenerOnceSet<T>>,
}
impl<T: Clone> PubSubInternal<T> {
fn emit(&self, val: T) {
if self.deleted.get() {
tracing::warn!("`Callback` invoked after `PubSub` dropped");
}
for listener in self.listeners.borrow().iter() {
listener(val.clone());
}
for listener in self.once_listeners.borrow_mut().drain() {
listener(val.clone());
}
}
}
#[derive(Derivative)]
#[derivative(Default(bound = ""))]
pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
pub trait AddListener<T> {
fn add_listener(&self, f: T) -> Subscription;
}
impl<T: Clone + 'static> PubSub<T> {
pub fn emit(&self, val: T) {
self.0.emit(val);
}
#[must_use]
pub fn callback(&self) -> Callback<T> {
let internal = self.0.clone();
Callback::from(move |val: T| internal.emit(val))
}
pub fn add_notify_listener(&self, cb: &Callback<()>) -> Subscription {
let cb = cb.clone();
self.add_listener(move |_: T| cb.emit(()))
}
#[must_use]
pub fn as_boxfn(&self) -> Box<dyn Fn(T) + 'static> {
let internal = PubSub(self.0.clone());
Box::new(move |val: T| internal.emit(val))
}
pub async fn read_next(&self) -> Result<T, Canceled> {
let (sender, receiver) = channel::<T>();
let f = move |x| sender.send(x).unwrap_or(());
self.0.once_listeners.borrow_mut().insert(Box::new(f));
receiver.await
}
#[must_use]
pub fn subscriber(&self) -> Subscriber<T> {
Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
}
}
impl<T: Clone> Drop for PubSub<T> {
fn drop(&mut self) {
self.0.deleted.set(true);
}
}
impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
fn add_listener(&self, f: Callback<T>) -> Subscription {
let internal = self.0.clone();
let cb = Box::new(move |x| f.emit(x));
let key = self.0.listeners.borrow_mut().insert(cb);
Subscription(Box::new(move || {
internal.listeners.borrow_mut().remove(key)
}))
}
}
impl<T, U> AddListener<U> for PubSub<T>
where
T: Clone + 'static,
U: Fn(T) + 'static,
{
fn add_listener(&self, f: U) -> Subscription {
let internal = self.0.clone();
let key = self.0.listeners.borrow_mut().insert(Box::new(f));
Subscription(Box::new(move || {
internal.listeners.borrow_mut().remove(key)
}))
}
}
#[derive(Clone)]
pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
impl<T, U> AddListener<U> for Subscriber<T>
where
T: Clone + 'static,
U: Fn(T) + 'static,
{
fn add_listener(&self, f: U) -> Subscription {
if let Some(internal) = self.0.upgrade() {
let key = internal.listeners.borrow_mut().insert(Box::new(f));
Subscription(Box::new(move || {
internal.listeners.borrow_mut().remove(key)
}))
} else {
Subscription(Box::new(|| {}))
}
}
}
impl<T: Clone> Default for Subscriber<T> {
fn default() -> Self {
Self(Weak::new())
}
}
impl<T: Clone> PartialEq for Subscriber<T> {
fn eq(&self, other: &Self) -> bool {
match (self.0.upgrade(), other.0.upgrade()) {
(Some(x), Some(y)) => std::ptr::eq(
&*x as *const PubSubInternal<T>,
&*y as *const PubSubInternal<T>,
),
(None, None) => true,
_ => false,
}
}
}
#[must_use]
pub struct Subscription(Box<dyn Fn()>);
impl Drop for Subscription {
fn drop(&mut self) {
(*self.0)();
}
}