perspective_viewer/utils/
pubsub.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::cell::{Cell, RefCell};
14use std::collections::HashMap;
15use std::rc::{Rc, Weak};
16
17use derivative::Derivative;
18use futures::channel::oneshot::*;
19use yew::prelude::*;
20
21/// An internal `HashSet` variant which supports unconstrained `T` e.g.
22/// without `Hash`, via returning a unique `usize` index for each insert
23/// which can be used for a reciprocal `remove(x: usize)`.
24#[derive(Derivative)]
25#[derivative(Default(bound = ""))]
26struct IndexedSet<T> {
27    set: HashMap<usize, T>,
28    gen_: usize,
29}
30
31impl<T> IndexedSet<T> {
32    fn insert(&mut self, v: T) -> usize {
33        let key = self.gen_;
34        self.set.insert(key, v);
35        self.gen_ += 1;
36        key
37    }
38
39    fn remove(&mut self, key: usize) {
40        self.set.remove(&key);
41    }
42
43    fn iter(&self) -> impl Iterator<Item = &T> {
44        self.set.values()
45    }
46
47    fn drain(&mut self) -> impl Iterator<Item = T> {
48        let mut x = Box::default();
49        std::mem::swap(&mut self.set, &mut x);
50        x.into_values()
51    }
52}
53
54type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
55type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
56
57#[derive(Derivative)]
58#[derivative(Default(bound = ""))]
59pub struct PubSubInternal<T: Clone> {
60    deleted: Cell<bool>,
61    listeners: RefCell<ListenerSet<T>>,
62    once_listeners: RefCell<ListenerOnceSet<T>>,
63}
64
65impl<T: Clone> PubSubInternal<T> {
66    fn emit(&self, val: T) {
67        if self.deleted.get() {
68            tracing::warn!("`Callback` invoked after `PubSub` dropped");
69        }
70
71        for listener in self.listeners.borrow().iter() {
72            listener(val.clone());
73        }
74
75        for listener in self.once_listeners.borrow_mut().drain() {
76            listener(val.clone());
77        }
78    }
79}
80
81/// A pub/sub struct which allows many listeners to subscribe to many
82/// publishers, without leaking callbacks as listeners are dropped.
83///
84/// Unlike `mpsc` etc., `PubSub` has no internal queue and is completely
85/// synchronous.
86#[derive(Derivative)]
87#[derivative(Default(bound = ""))]
88pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
89
90unsafe impl<T: Clone> Send for PubSub<T> {}
91unsafe impl<T: Clone> Sync for PubSub<T> {}
92
93pub trait AddListener<T> {
94    /// Register a listener to this `PubSub<_>`, which will be automatically
95    /// deregistered when the return `Subscription` is dropped.
96    ///
97    /// # Arguments
98    /// - `f` The callback, presumably a function-like type.
99    fn add_listener(&self, f: T) -> Subscription;
100}
101
102impl<T: Clone + 'static> PubSub<T> {
103    /// Emit a value to all listeners.
104    ///
105    /// # Arguments
106    /// - `val` The value to emit.
107    pub fn emit(&self, val: T) {
108        self.0.emit(val);
109    }
110
111    /// Get this `PubSub<_>`'s `.emit_all()` method as a `Callback<T>`.
112    pub fn callback(&self) -> Callback<T> {
113        let internal = self.0.clone();
114        Callback::from(move |val: T| internal.emit(val))
115    }
116
117    pub fn as_boxfn(&self) -> Box<dyn Fn(T) + Send + Sync + 'static> {
118        let internal = PubSub(self.0.clone());
119        Box::new(move |val: T| internal.emit(val))
120    }
121
122    /// Await this `PubSub<_>`'s next call to `emit_all()`, once.
123    pub async fn listen_once(&self) -> Result<T, Canceled> {
124        let (sender, receiver) = channel::<T>();
125        let f = move |x| sender.send(x).unwrap_or(());
126        self.0.once_listeners.borrow_mut().insert(Box::new(f));
127        receiver.await
128    }
129
130    /// Create a `Subscriber` from this `PubSub`, which is the reciprocal of
131    /// `PubSub::callback` (a struct which only allows sending), a struct which
132    /// only allows receiving via `Subscriber::add_listener`.
133    pub fn subscriber(&self) -> Subscriber<T> {
134        Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
135    }
136}
137
138impl<T: Clone> Drop for PubSub<T> {
139    fn drop(&mut self) {
140        self.0.deleted.set(true);
141    }
142}
143
144impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
145    fn add_listener(&self, f: Callback<T>) -> Subscription {
146        let internal = self.0.clone();
147        let cb = Box::new(move |x| f.emit(x));
148        let key = self.0.listeners.borrow_mut().insert(cb);
149        Subscription(Box::new(move || {
150            internal.listeners.borrow_mut().remove(key)
151        }))
152    }
153}
154
155impl<T, U> AddListener<U> for PubSub<T>
156where
157    T: Clone + 'static,
158    U: Fn(T) + 'static,
159{
160    fn add_listener(&self, f: U) -> Subscription {
161        let internal = self.0.clone();
162        let key = self.0.listeners.borrow_mut().insert(Box::new(f));
163        Subscription(Box::new(move || {
164            internal.listeners.borrow_mut().remove(key)
165        }))
166    }
167}
168
169/// Like a `PubSub` without `PubSub::emit`; the reciprocal of
170/// `PubSub::callback`. `Subscriber` does not keep the parent `PubSub` alive.
171#[derive(Clone)]
172pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
173
174impl<T, U> AddListener<U> for Subscriber<T>
175where
176    T: Clone + 'static,
177    U: Fn(T) + 'static,
178{
179    fn add_listener(&self, f: U) -> Subscription {
180        if let Some(internal) = self.0.upgrade() {
181            let key = internal.listeners.borrow_mut().insert(Box::new(f));
182            Subscription(Box::new(move || {
183                internal.listeners.borrow_mut().remove(key)
184            }))
185        } else {
186            Subscription(Box::new(|| {}))
187        }
188    }
189}
190
191impl<T: Clone> Default for Subscriber<T> {
192    fn default() -> Self {
193        Self(Weak::new())
194    }
195}
196
197impl<T: Clone> PartialEq for Subscriber<T> {
198    fn eq(&self, other: &Self) -> bool {
199        match (self.0.upgrade(), other.0.upgrade()) {
200            (Some(x), Some(y)) => std::ptr::eq(
201                &*x as *const PubSubInternal<T>,
202                &*y as *const PubSubInternal<T>,
203            ),
204            (None, None) => true,
205            _ => false,
206        }
207    }
208}
209
210/// Manages the lifetime of a listener registered to a `PubSub<T>` by
211/// deregistering the associated listener when dropped.
212///
213/// The wrapped `Fn` of `Subscriptions` is the deregister closure provided by
214/// the issuing `PubSub<T>`.
215#[must_use]
216pub struct Subscription(Box<dyn Fn()>);
217
218impl Drop for Subscription {
219    fn drop(&mut self) {
220        (*self.0)();
221    }
222}