perspective_viewer/utils/
pubsub.rs1use std::cell::{Cell, RefCell};
14use std::collections::HashMap;
15use std::rc::{Rc, Weak};
16
17use derivative::Derivative;
18use futures::channel::oneshot::*;
19use yew::prelude::*;
20
21#[derive(Derivative)]
24#[derivative(Default(bound = ""))]
25struct IndexedSet<T> {
26 set: HashMap<usize, T>,
27 gen_: usize,
28}
29
30impl<T> IndexedSet<T> {
31 fn insert(&mut self, v: T) -> usize {
32 let key = self.gen_;
33 if self.set.insert(key, v).is_some() {
34 tracing::warn!("Collision");
35 };
36
37 self.gen_ += 1;
38 key
39 }
40
41 fn remove(&mut self, key: usize) {
42 self.set.remove(&key);
43 }
44
45 fn iter(&self) -> impl Iterator<Item = &T> {
46 self.set.values()
47 }
48
49 fn drain(&mut self) -> impl Iterator<Item = T> {
50 let mut x = Box::default();
51 std::mem::swap(&mut self.set, &mut x);
52 x.into_values()
53 }
54}
55
56type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
57type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
58
59#[derive(Derivative)]
60#[derivative(Default(bound = ""))]
61struct PubSubInternal<T: Clone> {
62 deleted: Cell<bool>,
63 listeners: RefCell<ListenerSet<T>>,
64 once_listeners: RefCell<ListenerOnceSet<T>>,
65}
66
67impl<T: Clone> PubSubInternal<T> {
68 fn emit(&self, val: T) {
69 if self.deleted.get() {
70 tracing::warn!("`Callback` invoked after `PubSub` dropped");
71 }
72
73 for listener in self.listeners.borrow().iter() {
74 listener(val.clone());
75 }
76
77 for listener in self.once_listeners.borrow_mut().drain() {
78 listener(val.clone());
79 }
80 }
81}
82
83#[derive(Derivative)]
91#[derivative(Default(bound = ""))]
92pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
93
94pub trait AddListener<T> {
97 fn add_listener(&self, f: T) -> Subscription;
103}
104
105impl<T: Clone + 'static> PubSub<T> {
106 pub fn emit(&self, val: T) {
111 self.0.emit(val);
112 }
113
114 #[must_use]
116 pub fn callback(&self) -> Callback<T> {
117 let internal = self.0.clone();
118 Callback::from(move |val: T| internal.emit(val))
119 }
120
121 pub fn add_notify_listener(&self, cb: &Callback<()>) -> Subscription {
125 let cb = cb.clone();
126 self.add_listener(move |_: T| cb.emit(()))
127 }
128
129 #[must_use]
131 pub fn as_boxfn(&self) -> Box<dyn Fn(T) + 'static> {
132 let internal = PubSub(self.0.clone());
133 Box::new(move |val: T| internal.emit(val))
134 }
135
136 pub async fn read_next(&self) -> Result<T, Canceled> {
138 let (sender, receiver) = channel::<T>();
139 let f = move |x| sender.send(x).unwrap_or(());
140 self.0.once_listeners.borrow_mut().insert(Box::new(f));
141 receiver.await
142 }
143
144 #[must_use]
148 pub fn subscriber(&self) -> Subscriber<T> {
149 Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
150 }
151}
152
153impl<T: Clone> Drop for PubSub<T> {
154 fn drop(&mut self) {
155 self.0.deleted.set(true);
156 }
157}
158
159impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
160 fn add_listener(&self, f: Callback<T>) -> Subscription {
161 let internal = self.0.clone();
162 let cb = Box::new(move |x| f.emit(x));
163 let key = self.0.listeners.borrow_mut().insert(cb);
164 Subscription(Box::new(move || {
165 internal.listeners.borrow_mut().remove(key)
166 }))
167 }
168}
169
170impl<T, U> AddListener<U> for PubSub<T>
171where
172 T: Clone + 'static,
173 U: Fn(T) + 'static,
174{
175 fn add_listener(&self, f: U) -> Subscription {
176 let internal = self.0.clone();
177 let key = self.0.listeners.borrow_mut().insert(Box::new(f));
178 Subscription(Box::new(move || {
179 internal.listeners.borrow_mut().remove(key)
180 }))
181 }
182}
183
184#[derive(Clone)]
187pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
188
189impl<T, U> AddListener<U> for Subscriber<T>
190where
191 T: Clone + 'static,
192 U: Fn(T) + 'static,
193{
194 fn add_listener(&self, f: U) -> Subscription {
195 if let Some(internal) = self.0.upgrade() {
196 let key = internal.listeners.borrow_mut().insert(Box::new(f));
197 Subscription(Box::new(move || {
198 internal.listeners.borrow_mut().remove(key)
199 }))
200 } else {
201 Subscription(Box::new(|| {}))
202 }
203 }
204}
205
206impl<T: Clone> Default for Subscriber<T> {
207 fn default() -> Self {
208 Self(Weak::new())
209 }
210}
211
212impl<T: Clone> PartialEq for Subscriber<T> {
213 fn eq(&self, other: &Self) -> bool {
214 match (self.0.upgrade(), other.0.upgrade()) {
215 (Some(x), Some(y)) => std::ptr::eq(
216 &*x as *const PubSubInternal<T>,
217 &*y as *const PubSubInternal<T>,
218 ),
219 (None, None) => true,
220 _ => false,
221 }
222 }
223}
224
225#[must_use]
231pub struct Subscription(Box<dyn Fn()>);
232
233impl Drop for Subscription {
234 fn drop(&mut self) {
235 (*self.0)();
236 }
237}