perspective_viewer/utils/
pubsub.rs1use std::cell::{Cell, RefCell};
14use std::collections::HashMap;
15use std::rc::{Rc, Weak};
16
17use derivative::Derivative;
18use futures::channel::oneshot::*;
19use yew::prelude::*;
20
21#[derive(Derivative)]
24#[derivative(Default(bound = ""))]
25struct IndexedSet<T> {
26 set: HashMap<usize, T>,
27 gen_: usize,
28}
29
30impl<T> IndexedSet<T> {
31 fn insert(&mut self, v: T) -> usize {
32 let key = self.gen_;
33 if self.set.insert(key, v).is_some() {
34 tracing::warn!("Collision");
35 };
36
37 self.gen_ += 1;
38 key
39 }
40
41 fn remove(&mut self, key: usize) {
42 self.set.remove(&key);
43 }
44
45 fn iter(&self) -> impl Iterator<Item = &T> {
46 self.set.values()
47 }
48
49 fn drain(&mut self) -> impl Iterator<Item = T> {
50 let mut x = Box::default();
51 std::mem::swap(&mut self.set, &mut x);
52 x.into_values()
53 }
54}
55
56type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
57type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
58
59#[derive(Derivative)]
60#[derivative(Default(bound = ""))]
61struct PubSubInternal<T: Clone> {
62 deleted: Cell<bool>,
63 listeners: RefCell<ListenerSet<T>>,
64 once_listeners: RefCell<ListenerOnceSet<T>>,
65}
66
67impl<T: Clone> PubSubInternal<T> {
68 fn emit(&self, val: T) {
69 if self.deleted.get() {
70 tracing::warn!("`Callback` invoked after `PubSub` dropped");
71 }
72
73 for listener in self.listeners.borrow().iter() {
74 listener(val.clone());
75 }
76
77 for listener in self.once_listeners.borrow_mut().drain() {
78 listener(val.clone());
79 }
80 }
81}
82
83#[derive(Derivative)]
89#[derivative(Default(bound = ""))]
90pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
91
92pub trait AddListener<T> {
95 fn add_listener(&self, f: T) -> Subscription;
101}
102
103impl<T: Clone + 'static> PubSub<T> {
104 pub fn emit(&self, val: T) {
109 self.0.emit(val);
110 }
111
112 #[must_use]
114 pub fn callback(&self) -> Callback<T> {
115 let internal = self.0.clone();
116 Callback::from(move |val: T| internal.emit(val))
117 }
118
119 #[must_use]
121 pub fn as_boxfn(&self) -> Box<dyn Fn(T) + 'static> {
122 let internal = PubSub(self.0.clone());
123 Box::new(move |val: T| internal.emit(val))
124 }
125
126 pub async fn read_next(&self) -> Result<T, Canceled> {
128 let (sender, receiver) = channel::<T>();
129 let f = move |x| sender.send(x).unwrap_or(());
130 self.0.once_listeners.borrow_mut().insert(Box::new(f));
131 receiver.await
132 }
133
134 #[must_use]
138 pub fn subscriber(&self) -> Subscriber<T> {
139 Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
140 }
141}
142
143impl<T: Clone> Drop for PubSub<T> {
144 fn drop(&mut self) {
145 self.0.deleted.set(true);
146 }
147}
148
149impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
150 fn add_listener(&self, f: Callback<T>) -> Subscription {
151 let internal = self.0.clone();
152 let cb = Box::new(move |x| f.emit(x));
153 let key = self.0.listeners.borrow_mut().insert(cb);
154 Subscription(Box::new(move || {
155 internal.listeners.borrow_mut().remove(key)
156 }))
157 }
158}
159
160impl<T, U> AddListener<U> for PubSub<T>
161where
162 T: Clone + 'static,
163 U: Fn(T) + 'static,
164{
165 fn add_listener(&self, f: U) -> Subscription {
166 let internal = self.0.clone();
167 let key = self.0.listeners.borrow_mut().insert(Box::new(f));
168 Subscription(Box::new(move || {
169 internal.listeners.borrow_mut().remove(key)
170 }))
171 }
172}
173
174#[derive(Clone)]
177pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
178
179impl<T, U> AddListener<U> for Subscriber<T>
180where
181 T: Clone + 'static,
182 U: Fn(T) + 'static,
183{
184 fn add_listener(&self, f: U) -> Subscription {
185 if let Some(internal) = self.0.upgrade() {
186 let key = internal.listeners.borrow_mut().insert(Box::new(f));
187 Subscription(Box::new(move || {
188 internal.listeners.borrow_mut().remove(key)
189 }))
190 } else {
191 Subscription(Box::new(|| {}))
192 }
193 }
194}
195
196impl<T: Clone> Default for Subscriber<T> {
197 fn default() -> Self {
198 Self(Weak::new())
199 }
200}
201
202impl<T: Clone> PartialEq for Subscriber<T> {
203 fn eq(&self, other: &Self) -> bool {
204 match (self.0.upgrade(), other.0.upgrade()) {
205 (Some(x), Some(y)) => std::ptr::eq(
206 &*x as *const PubSubInternal<T>,
207 &*y as *const PubSubInternal<T>,
208 ),
209 (None, None) => true,
210 _ => false,
211 }
212 }
213}
214
215#[must_use]
221pub struct Subscription(Box<dyn Fn()>);
222
223impl Drop for Subscription {
224 fn drop(&mut self) {
225 (*self.0)();
226 }
227}