1use crate::prelude::*;
2use smallvec::SmallVec;
3use std::{
4 any::Any,
5 cell::RefCell,
6 fmt::{Debug, Formatter},
7 rc::Rc,
8 sync::{Arc, Mutex},
9};
10
11pub trait SubscriptionLike {
14 fn unsubscribe(&mut self);
17
18 fn is_closed(&self) -> bool;
19}
20
21impl Debug for Box<dyn SubscriptionLike> {
22 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23 f.debug_struct("Box<dyn SubscriptionLike>")
24 .field("is_closed", &self.is_closed())
25 .finish()
26 }
27}
28
29#[derive(Clone, Debug, Default)]
30pub struct LocalSubscription(Rc<RefCell<Inner<Box<dyn SubscriptionLike>>>>);
31
32impl LocalSubscription {
33 pub fn add<S: SubscriptionLike + 'static>(&self, subscription: S) {
34 if !self.is_same(&subscription) {
35 self.0.borrow_mut().add(Box::new(subscription))
36 }
37 }
38
39 fn is_same(&self, other: &dyn Any) -> bool {
40 if let Some(other) = other.downcast_ref::<Self>() {
41 Rc::ptr_eq(&self.0, &other.0)
42 } else {
43 false
44 }
45 }
46}
47
48impl TearDownSize for LocalSubscription {
49 fn teardown_size(&self) -> usize { self.0.borrow().teardown.len() }
50}
51
52pub trait TearDownSize: SubscriptionLike {
53 fn teardown_size(&self) -> usize;
54}
55
56impl SubscriptionLike for LocalSubscription {
57 #[inline]
58 fn unsubscribe(&mut self) { self.0.unsubscribe() }
59 #[inline]
60 fn is_closed(&self) -> bool { self.0.is_closed() }
61}
62
63#[derive(Clone, Debug, Default)]
64pub struct SharedSubscription(
65 Arc<Mutex<Inner<Box<dyn SubscriptionLike + Send + Sync>>>>,
66);
67
68impl SharedSubscription {
69 pub fn add<S: SubscriptionLike + Send + Sync + 'static>(
70 &self,
71 subscription: S,
72 ) {
73 if !self.is_same(&subscription) {
74 self.0.lock().unwrap().add(Box::new(subscription));
75 }
76 }
77
78 fn is_same(&self, other: &dyn Any) -> bool {
79 if let Some(other) = other.downcast_ref::<Self>() {
80 Arc::ptr_eq(&self.0, &other.0)
81 } else {
82 false
83 }
84 }
85}
86
87impl TearDownSize for SharedSubscription {
88 fn teardown_size(&self) -> usize { self.0.lock().unwrap().teardown.len() }
89}
90
91impl SubscriptionLike for SharedSubscription {
92 #[inline]
93 fn unsubscribe(&mut self) { self.0.unsubscribe(); }
94 #[inline]
95 fn is_closed(&self) -> bool { self.0.is_closed() }
96}
97
98pub trait Publisher: Observer + SubscriptionLike {
99 #[inline]
100 fn is_finished(&self) -> bool { self.is_closed() || self.is_stopped() }
101}
102
103impl<T> Publisher for T where T: Observer + SubscriptionLike {}
104
105struct Inner<T> {
106 closed: bool,
107 teardown: SmallVec<[T; 1]>,
108}
109
110impl<T> Debug for Inner<T> {
111 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("Inner")
113 .field("closed", &self.closed)
114 .field("teardown_count", &self.teardown.len())
115 .finish()
116 }
117}
118
119impl<T: SubscriptionLike> SubscriptionLike for Inner<T> {
120 #[inline(always)]
121 fn is_closed(&self) -> bool { self.closed }
122
123 fn unsubscribe(&mut self) {
124 if !self.closed {
125 self.closed = true;
126 for v in &mut self.teardown {
127 v.unsubscribe();
128 }
129 }
130 }
131}
132
133impl<T: SubscriptionLike> Inner<T> {
134 fn add(&mut self, mut v: T) {
135 if self.closed {
136 v.unsubscribe();
137 } else {
138 self.teardown.retain(|v| !v.is_closed());
139 self.teardown.push(v);
140 }
141 }
142}
143
144impl<T> Default for Inner<T> {
145 fn default() -> Self {
146 Inner {
147 closed: false,
148 teardown: SmallVec::new(),
149 }
150 }
151}
152
153impl<T> SubscriptionLike for Arc<Mutex<T>>
154where
155 T: SubscriptionLike,
156{
157 #[inline]
158 fn unsubscribe(&mut self) { self.lock().unwrap().unsubscribe() }
159
160 #[inline]
161 fn is_closed(&self) -> bool { self.lock().unwrap().is_closed() }
162}
163
164impl<T> SubscriptionLike for Rc<RefCell<T>>
165where
166 T: SubscriptionLike,
167{
168 #[inline]
169 fn unsubscribe(&mut self) { self.borrow_mut().unsubscribe() }
170
171 #[inline]
172 fn is_closed(&self) -> bool { self.borrow().is_closed() }
173}
174
175impl<T: ?Sized> SubscriptionLike for Box<T>
176where
177 T: SubscriptionLike,
178{
179 #[inline]
180 fn unsubscribe(&mut self) {
181 let s = &mut **self;
182 s.unsubscribe()
183 }
184
185 #[inline]
186 fn is_closed(&self) -> bool {
187 let s = &**self;
188 s.is_closed()
189 }
190}
191
192pub struct SubscriptionWrapper<T: SubscriptionLike>(pub(crate) T);
195
196impl<T: SubscriptionLike> SubscriptionWrapper<T> {
197 pub fn unsubscribe_when_dropped(self) -> SubscriptionGuard<T> {
205 SubscriptionGuard(self.0)
206 }
207
208 pub fn into_inner(self) -> T { self.0 }
210}
211
212impl<T: SubscriptionLike> SubscriptionLike for SubscriptionWrapper<T> {
213 #[inline]
214 fn is_closed(&self) -> bool { self.0.is_closed() }
215 #[inline]
216 fn unsubscribe(&mut self) { self.0.unsubscribe() }
217}
218
219#[derive(Debug)]
230#[must_use]
231pub struct SubscriptionGuard<T: SubscriptionLike>(pub(crate) T);
232
233impl<T: SubscriptionLike> SubscriptionGuard<T> {
234 pub fn new(subscription: T) -> SubscriptionGuard<T> {
237 SubscriptionGuard(subscription)
238 }
239}
240
241impl<T: SubscriptionLike> Drop for SubscriptionGuard<T> {
242 #[inline]
243 fn drop(&mut self) { self.0.unsubscribe() }
244}
245
246#[cfg(test)]
247mod test {
248 use super::*;
249 #[test]
250 fn add_remove_for_local() {
251 let local = LocalSubscription::default();
252 let l1 = LocalSubscription::default();
253 let l2 = LocalSubscription::default();
254 let l3 = LocalSubscription::default();
255 local.add(l1);
256 assert_eq!(local.0.borrow().teardown.len(), 1);
257 local.add(l2);
258 assert_eq!(local.0.borrow().teardown.len(), 2);
259 local.add(l3);
260 assert_eq!(local.0.borrow().teardown.len(), 3);
261 }
262
263 #[test]
264 fn add_remove_for_shared() {
265 let local = SharedSubscription::default();
266 let l1 = SharedSubscription::default();
267 let l2 = SharedSubscription::default();
268 let l3 = SharedSubscription::default();
269 local.add(l1);
270 assert_eq!(local.0.lock().unwrap().teardown.len(), 1);
271 local.add(l2);
272 assert_eq!(local.0.lock().unwrap().teardown.len(), 2);
273 local.add(l3);
274 assert_eq!(local.0.lock().unwrap().teardown.len(), 3);
275 }
276}