Skip to main content

perspective_viewer/utils/
pubsub.rs

1// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
2// ┃ ██████ ██████ ██████       █      █      █      █      █ █▄  ▀███ █       ┃
3// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█  ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄  ▀█ █ ▀▀▀▀▀ ┃
4// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄   █ ▄▄▄▄▄ ┃
5// ┃ █      ██████ █  ▀█▄       █ ██████      █      ███▌▐███ ███████▄ █       ┃
6// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
7// ┃ Copyright (c) 2017, the Perspective Authors.                              ┃
8// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
9// ┃ This file is part of the Perspective library, distributed under the terms ┃
10// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
11// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
12
13use std::cell::{Cell, RefCell};
14use std::collections::HashMap;
15use std::rc::{Rc, Weak};
16
17use derivative::Derivative;
18use futures::channel::oneshot::*;
19use yew::prelude::*;
20
21/// A simple collection with lookup and remove with stable indices, for
22/// collecting `Box<dyn Fn()>` which do not implement [`Hash`].
23#[derive(Derivative)]
24#[derivative(Default(bound = ""))]
25struct IndexedSet<T> {
26    set: HashMap<usize, T>,
27    gen_: usize,
28}
29
30impl<T> IndexedSet<T> {
31    fn insert(&mut self, v: T) -> usize {
32        let key = self.gen_;
33        if self.set.insert(key, v).is_some() {
34            tracing::warn!("Collision");
35        };
36
37        self.gen_ += 1;
38        key
39    }
40
41    fn remove(&mut self, key: usize) {
42        self.set.remove(&key);
43    }
44
45    fn iter(&self) -> impl Iterator<Item = &T> {
46        self.set.values()
47    }
48
49    fn drain(&mut self) -> impl Iterator<Item = T> {
50        let mut x = Box::default();
51        std::mem::swap(&mut self.set, &mut x);
52        x.into_values()
53    }
54}
55
56type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
57type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
58
59#[derive(Derivative)]
60#[derivative(Default(bound = ""))]
61struct PubSubInternal<T: Clone> {
62    deleted: Cell<bool>,
63    listeners: RefCell<ListenerSet<T>>,
64    once_listeners: RefCell<ListenerOnceSet<T>>,
65}
66
67impl<T: Clone> PubSubInternal<T> {
68    fn emit(&self, val: T) {
69        if self.deleted.get() {
70            tracing::warn!("`Callback` invoked after `PubSub` dropped");
71        }
72
73        for listener in self.listeners.borrow().iter() {
74            listener(val.clone());
75        }
76
77        for listener in self.once_listeners.borrow_mut().drain() {
78            listener(val.clone());
79        }
80    }
81}
82
83/// A pub/sub struct which allows many listeners to subscribe to many
84/// publishers, without leaking callbacks as listeners are dropped.
85///
86/// Unlike `mpsc` etc., `PubSub` has no internal queue and is completely
87/// synchronous.
88#[derive(Derivative)]
89#[derivative(Default(bound = ""))]
90pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
91
92/// An extension trait for [`AddListener::add_listener`], which we want to
93/// take a variety of function-like arguments for readability.
94pub trait AddListener<T> {
95    /// Register a listener to this `PubSub<_>`, which will be automatically
96    /// deregistered when the return `Subscription` is dropped.
97    ///
98    /// # Arguments
99    /// - `f` The callback, presumably a function-like type.
100    fn add_listener(&self, f: T) -> Subscription;
101}
102
103impl<T: Clone + 'static> PubSub<T> {
104    /// Emit a value to all listeners.
105    ///
106    /// # Arguments
107    /// - `val` The value to emit.
108    pub fn emit(&self, val: T) {
109        self.0.emit(val);
110    }
111
112    /// Get this `PubSub<_>`'s `.emit_all()` method as a `Callback<T>`.
113    #[must_use]
114    pub fn callback(&self) -> Callback<T> {
115        let internal = self.0.clone();
116        Callback::from(move |val: T| internal.emit(val))
117    }
118
119    /// Convert [`PubSub::emit`] to a `Box<dyn Fn(T)>`.
120    #[must_use]
121    pub fn as_boxfn(&self) -> Box<dyn Fn(T) + 'static> {
122        let internal = PubSub(self.0.clone());
123        Box::new(move |val: T| internal.emit(val))
124    }
125
126    /// Await this `PubSub<_>`'s next call to `emit_all()`, once.
127    pub async fn read_next(&self) -> Result<T, Canceled> {
128        let (sender, receiver) = channel::<T>();
129        let f = move |x| sender.send(x).unwrap_or(());
130        self.0.once_listeners.borrow_mut().insert(Box::new(f));
131        receiver.await
132    }
133
134    /// Create a `Subscriber` from this `PubSub`, which is the reciprocal of
135    /// `PubSub::callback` (a struct which only allows sending), a struct which
136    /// only allows receiving via `Subscriber::add_listener`.
137    #[must_use]
138    pub fn subscriber(&self) -> Subscriber<T> {
139        Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
140    }
141}
142
143impl<T: Clone> Drop for PubSub<T> {
144    fn drop(&mut self) {
145        self.0.deleted.set(true);
146    }
147}
148
149impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
150    fn add_listener(&self, f: Callback<T>) -> Subscription {
151        let internal = self.0.clone();
152        let cb = Box::new(move |x| f.emit(x));
153        let key = self.0.listeners.borrow_mut().insert(cb);
154        Subscription(Box::new(move || {
155            internal.listeners.borrow_mut().remove(key)
156        }))
157    }
158}
159
160impl<T, U> AddListener<U> for PubSub<T>
161where
162    T: Clone + 'static,
163    U: Fn(T) + 'static,
164{
165    fn add_listener(&self, f: U) -> Subscription {
166        let internal = self.0.clone();
167        let key = self.0.listeners.borrow_mut().insert(Box::new(f));
168        Subscription(Box::new(move || {
169            internal.listeners.borrow_mut().remove(key)
170        }))
171    }
172}
173
174/// Like a `PubSub` without `PubSub::emit`; the reciprocal of
175/// `PubSub::callback`. `Subscriber` does not keep the parent `PubSub` alive.
176#[derive(Clone)]
177pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
178
179impl<T, U> AddListener<U> for Subscriber<T>
180where
181    T: Clone + 'static,
182    U: Fn(T) + 'static,
183{
184    fn add_listener(&self, f: U) -> Subscription {
185        if let Some(internal) = self.0.upgrade() {
186            let key = internal.listeners.borrow_mut().insert(Box::new(f));
187            Subscription(Box::new(move || {
188                internal.listeners.borrow_mut().remove(key)
189            }))
190        } else {
191            Subscription(Box::new(|| {}))
192        }
193    }
194}
195
196impl<T: Clone> Default for Subscriber<T> {
197    fn default() -> Self {
198        Self(Weak::new())
199    }
200}
201
202impl<T: Clone> PartialEq for Subscriber<T> {
203    fn eq(&self, other: &Self) -> bool {
204        match (self.0.upgrade(), other.0.upgrade()) {
205            (Some(x), Some(y)) => std::ptr::eq(
206                &*x as *const PubSubInternal<T>,
207                &*y as *const PubSubInternal<T>,
208            ),
209            (None, None) => true,
210            _ => false,
211        }
212    }
213}
214
215/// Manages the lifetime of a listener registered to a `PubSub<T>` by
216/// deregistering the associated listener when dropped.
217///
218/// The wrapped `Fn` of `Subscriptions` is the deregister closure provided by
219/// the issuing `PubSub<T>`.
220#[must_use]
221pub struct Subscription(Box<dyn Fn()>);
222
223impl Drop for Subscription {
224    fn drop(&mut self) {
225        (*self.0)();
226    }
227}