perspective_viewer/utils/
pubsub.rs1use std::cell::{Cell, RefCell};
14use std::collections::HashMap;
15use std::rc::{Rc, Weak};
16
17use derivative::Derivative;
18use futures::channel::oneshot::*;
19use yew::prelude::*;
20
21#[derive(Derivative)]
25#[derivative(Default(bound = ""))]
26struct IndexedSet<T> {
27 set: HashMap<usize, T>,
28 gen_: usize,
29}
30
31impl<T> IndexedSet<T> {
32 fn insert(&mut self, v: T) -> usize {
33 let key = self.gen_;
34 self.set.insert(key, v);
35 self.gen_ += 1;
36 key
37 }
38
39 fn remove(&mut self, key: usize) {
40 self.set.remove(&key);
41 }
42
43 fn iter(&self) -> impl Iterator<Item = &T> {
44 self.set.values()
45 }
46
47 fn drain(&mut self) -> impl Iterator<Item = T> {
48 let mut x = Box::default();
49 std::mem::swap(&mut self.set, &mut x);
50 x.into_values()
51 }
52}
53
54type ListenerSet<T> = IndexedSet<Box<dyn Fn(T) + 'static>>;
55type ListenerOnceSet<T> = IndexedSet<Box<dyn FnOnce(T) + 'static>>;
56
57#[derive(Derivative)]
58#[derivative(Default(bound = ""))]
59pub struct PubSubInternal<T: Clone> {
60 deleted: Cell<bool>,
61 listeners: RefCell<ListenerSet<T>>,
62 once_listeners: RefCell<ListenerOnceSet<T>>,
63}
64
65impl<T: Clone> PubSubInternal<T> {
66 fn emit(&self, val: T) {
67 if self.deleted.get() {
68 tracing::warn!("`Callback` invoked after `PubSub` dropped");
69 }
70
71 for listener in self.listeners.borrow().iter() {
72 listener(val.clone());
73 }
74
75 for listener in self.once_listeners.borrow_mut().drain() {
76 listener(val.clone());
77 }
78 }
79}
80
81#[derive(Derivative)]
87#[derivative(Default(bound = ""))]
88pub struct PubSub<T: Clone>(Rc<PubSubInternal<T>>);
89
90unsafe impl<T: Clone> Send for PubSub<T> {}
91unsafe impl<T: Clone> Sync for PubSub<T> {}
92
93pub trait AddListener<T> {
94 fn add_listener(&self, f: T) -> Subscription;
100}
101
102impl<T: Clone + 'static> PubSub<T> {
103 pub fn emit(&self, val: T) {
108 self.0.emit(val);
109 }
110
111 pub fn callback(&self) -> Callback<T> {
113 let internal = self.0.clone();
114 Callback::from(move |val: T| internal.emit(val))
115 }
116
117 pub fn as_boxfn(&self) -> Box<dyn Fn(T) + Send + Sync + 'static> {
118 let internal = PubSub(self.0.clone());
119 Box::new(move |val: T| internal.emit(val))
120 }
121
122 pub async fn listen_once(&self) -> Result<T, Canceled> {
124 let (sender, receiver) = channel::<T>();
125 let f = move |x| sender.send(x).unwrap_or(());
126 self.0.once_listeners.borrow_mut().insert(Box::new(f));
127 receiver.await
128 }
129
130 pub fn subscriber(&self) -> Subscriber<T> {
134 Subscriber(Rc::<PubSubInternal<T>>::downgrade(&self.0))
135 }
136}
137
138impl<T: Clone> Drop for PubSub<T> {
139 fn drop(&mut self) {
140 self.0.deleted.set(true);
141 }
142}
143
144impl<T: Clone + 'static> AddListener<Callback<T>> for PubSub<T> {
145 fn add_listener(&self, f: Callback<T>) -> Subscription {
146 let internal = self.0.clone();
147 let cb = Box::new(move |x| f.emit(x));
148 let key = self.0.listeners.borrow_mut().insert(cb);
149 Subscription(Box::new(move || {
150 internal.listeners.borrow_mut().remove(key)
151 }))
152 }
153}
154
155impl<T, U> AddListener<U> for PubSub<T>
156where
157 T: Clone + 'static,
158 U: Fn(T) + 'static,
159{
160 fn add_listener(&self, f: U) -> Subscription {
161 let internal = self.0.clone();
162 let key = self.0.listeners.borrow_mut().insert(Box::new(f));
163 Subscription(Box::new(move || {
164 internal.listeners.borrow_mut().remove(key)
165 }))
166 }
167}
168
169#[derive(Clone)]
172pub struct Subscriber<T: Clone>(Weak<PubSubInternal<T>>);
173
174impl<T, U> AddListener<U> for Subscriber<T>
175where
176 T: Clone + 'static,
177 U: Fn(T) + 'static,
178{
179 fn add_listener(&self, f: U) -> Subscription {
180 if let Some(internal) = self.0.upgrade() {
181 let key = internal.listeners.borrow_mut().insert(Box::new(f));
182 Subscription(Box::new(move || {
183 internal.listeners.borrow_mut().remove(key)
184 }))
185 } else {
186 Subscription(Box::new(|| {}))
187 }
188 }
189}
190
191impl<T: Clone> Default for Subscriber<T> {
192 fn default() -> Self {
193 Self(Weak::new())
194 }
195}
196
197impl<T: Clone> PartialEq for Subscriber<T> {
198 fn eq(&self, other: &Self) -> bool {
199 match (self.0.upgrade(), other.0.upgrade()) {
200 (Some(x), Some(y)) => std::ptr::eq(
201 &*x as *const PubSubInternal<T>,
202 &*y as *const PubSubInternal<T>,
203 ),
204 (None, None) => true,
205 _ => false,
206 }
207 }
208}
209
210#[must_use]
216pub struct Subscription(Box<dyn Fn()>);
217
218impl Drop for Subscription {
219 fn drop(&mut self) {
220 (*self.0)();
221 }
222}