Skip to main content

perspective_viewer/utils/
pubsub.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::cell::{Cell, RefCell};
14use std::collections::HashMap;
15use std::rc::{Rc, Weak};
16
17use derivative::Derivative;
18use futures::channel::oneshot::*;
19use yew::prelude::*;
20
21/// A simple collection with lookup and remove with stable indices, for
22/// collecting `Box<dyn Fn()>` which do not implement [`Hash`].
23#[derive(Derivative)]
24#[derivative(Default(bound = ""))]
25struct IndexedSet<T> {
26    set: HashMap<usize, T>,
27    gen_: usize,
28}
29
30impl<T> IndexedSet<T> {
31    fn insert(&mut self, v: T) -> usize {
32        let key = self.gen_;
33        if self.set.insert(key, v).is_some() {
34            tracing::warn!("Collision");
35        };
36
37        self.gen_ += 1;
38        key
39    }
40
41    fn remove(&mut self, key: usize) {
42        self.set.remove(&key);
43    }
44
45    fn iter(&self) -> impl Iterator<Item = &T> {
46        self.set.values()
47    }
48
49    fn drain(&mut self) -> impl Iterator<Item = T> {
50        let mut x = Box::default();
51        std::mem::swap(&mut self.set, &mut x);
52        x.into_values()
53    }
54}
55
56type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
57type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
58
59#[derive(Derivative)]
60#[derivative(Default(bound = ""))]
61struct PubSubInternal<T: Clone> {
62    deleted: Cell<bool>,
63    listeners: RefCell<ListenerSet<T>>,
64    once_listeners: RefCell<ListenerOnceSet<T>>,
65}
66
67impl<T: Clone> PubSubInternal<T> {
68    fn emit(&self, val: T) {
69        if self.deleted.get() {
70            tracing::warn!("`Callback` invoked after `PubSub` dropped");
71        }
72
73        for listener in self.listeners.borrow().iter() {
74            listener(val.clone());
75        }
76
77        for listener in self.once_listeners.borrow_mut().drain() {
78            listener(val.clone());
79        }
80    }
81}
82
83/// A pub/sub struct which allows many listeners to subscribe to many
84/// publishers, without leaking callbacks as listeners are dropped.
85///
86/// Unlike `mpsc` etc., `PubSub` has no internal queue and is completely
87/// synchronous. Explicitly does not implement clone, as this is intended as
88/// RAII, even though the internal data structures are `Clone` because they
89/// need to be sent to listeners.
90#[derive(Derivative)]
91#[derivative(Default(bound = ""))]
92pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
93
94/// An extension trait for [`AddListener::add_listener`], which we want to
95/// take a variety of function-like arguments for readability.
96pub trait AddListener<T> {
97    /// Register a listener to this `PubSub<_>`, which will be automatically
98    /// deregistered when the return `Subscription` is dropped.
99    ///
100    /// # Arguments
101    /// - `f` The callback, presumably a function-like type.
102    fn add_listener(&self, f: T) -> Subscription;
103}
104
105impl<T: Clone + 'static> PubSub<T> {
106    /// Emit a value to all listeners.
107    ///
108    /// # Arguments
109    /// - `val` The value to emit.
110    pub fn emit(&self, val: T) {
111        self.0.emit(val);
112    }
113
114    /// Get this `PubSub<_>`'s `.emit_all()` method as a `Callback<T>`.
115    #[must_use]
116    pub fn callback(&self) -> Callback<T> {
117        let internal = self.0.clone();
118        Callback::from(move |val: T| internal.emit(val))
119    }
120
121    /// Subscribe a `Callback<()>` that fires whenever this PubSub emits,
122    /// discarding the emitted value.  Useful when the subscriber only
123    /// needs a "something changed" notification.
124    pub fn add_notify_listener(&self, cb: &Callback<()>) -> Subscription {
125        let cb = cb.clone();
126        self.add_listener(move |_: T| cb.emit(()))
127    }
128
129    /// Convert [`PubSub::emit`] to a `Box<dyn Fn(T)>`.
130    #[must_use]
131    pub fn as_boxfn(&self) -> Box<dyn Fn(T) + 'static> {
132        let internal = PubSub(self.0.clone());
133        Box::new(move |val: T| internal.emit(val))
134    }
135
136    /// Await this `PubSub<_>`'s next call to `emit_all()`, once.
137    pub async fn read_next(&self) -> Result<T, Canceled> {
138        let (sender, receiver) = channel::<T>();
139        let f = move |x| sender.send(x).unwrap_or(());
140        self.0.once_listeners.borrow_mut().insert(Box::new(f));
141        receiver.await
142    }
143
144    /// Create a `Subscriber` from this `PubSub`, which is the reciprocal of
145    /// `PubSub::callback` (a struct which only allows sending), a struct which
146    /// only allows receiving via `Subscriber::add_listener`.
147    #[must_use]
148    pub fn subscriber(&self) -> Subscriber<T> {
149        Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
150    }
151}
152
153impl<T: Clone> Drop for PubSub<T> {
154    fn drop(&mut self) {
155        self.0.deleted.set(true);
156    }
157}
158
159impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
160    fn add_listener(&self, f: Callback<T>) -> Subscription {
161        let internal = self.0.clone();
162        let cb = Box::new(move |x| f.emit(x));
163        let key = self.0.listeners.borrow_mut().insert(cb);
164        Subscription(Box::new(move || {
165            internal.listeners.borrow_mut().remove(key)
166        }))
167    }
168}
169
170impl<T, U> AddListener<U> for PubSub<T>
171where
172    T: Clone + 'static,
173    U: Fn(T) + 'static,
174{
175    fn add_listener(&self, f: U) -> Subscription {
176        let internal = self.0.clone();
177        let key = self.0.listeners.borrow_mut().insert(Box::new(f));
178        Subscription(Box::new(move || {
179            internal.listeners.borrow_mut().remove(key)
180        }))
181    }
182}
183
184/// Like a `PubSub` without `PubSub::emit`; the reciprocal of
185/// `PubSub::callback`. `Subscriber` does not keep the parent `PubSub` alive.
186#[derive(Clone)]
187pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
188
189impl<T, U> AddListener<U> for Subscriber<T>
190where
191    T: Clone + 'static,
192    U: Fn(T) + 'static,
193{
194    fn add_listener(&self, f: U) -> Subscription {
195        if let Some(internal) = self.0.upgrade() {
196            let key = internal.listeners.borrow_mut().insert(Box::new(f));
197            Subscription(Box::new(move || {
198                internal.listeners.borrow_mut().remove(key)
199            }))
200        } else {
201            Subscription(Box::new(|| {}))
202        }
203    }
204}
205
206impl<T: Clone> Default for Subscriber<T> {
207    fn default() -> Self {
208        Self(Weak::new())
209    }
210}
211
212impl<T: Clone> PartialEq for Subscriber<T> {
213    fn eq(&self, other: &Self) -> bool {
214        match (self.0.upgrade(), other.0.upgrade()) {
215            (Some(x), Some(y)) => std::ptr::eq(
216                &*x as *const PubSubInternal<T>,
217                &*y as *const PubSubInternal<T>,
218            ),
219            (None, None) => true,
220            _ => false,
221        }
222    }
223}
224
225/// Manages the lifetime of a listener registered to a `PubSub<T>` by
226/// deregistering the associated listener when dropped.
227///
228/// The wrapped `Fn` of `Subscriptions` is the deregister closure provided by
229/// the issuing `PubSub<T>`.
230#[must_use]
231pub struct Subscription(Box<dyn Fn()>);
232
233impl Drop for Subscription {
234    fn drop(&mut self) {
235        (*self.0)();
236    }
237}