reactive_graph/graph/
subscriber.rs1use super::{node::ReactiveNode, AnySource};
2#[cfg(debug_assertions)]
3use crate::diagnostics::SpecialNonReactiveZone;
4use core::{fmt::Debug, hash::Hash};
5use std::{cell::RefCell, mem, sync::Weak};
6
7thread_local! {
8 static OBSERVER: RefCell<Option<ObserverState>> = const { RefCell::new(None) };
9}
10
11#[derive(Debug)]
12struct ObserverState {
13 subscriber: AnySubscriber,
14 untracked: bool,
15}
16
17pub struct Observer;
23
24#[derive(Debug)]
25struct SetObserverOnDrop(Option<AnySubscriber>);
26
27impl Drop for SetObserverOnDrop {
28 fn drop(&mut self) {
29 Observer::set(self.0.take());
30 }
31}
32
33impl Observer {
34 pub fn get() -> Option<AnySubscriber> {
36 OBSERVER.with_borrow(|obs| {
37 obs.as_ref().and_then(|obs| {
38 if obs.untracked {
39 None
40 } else {
41 Some(obs.subscriber.clone())
42 }
43 })
44 })
45 }
46
47 pub(crate) fn is(observer: &AnySubscriber) -> bool {
48 OBSERVER.with_borrow(|o| {
49 o.as_ref().map(|o| &o.subscriber) == Some(observer)
50 })
51 }
52
53 fn take() -> SetObserverOnDrop {
54 SetObserverOnDrop(
55 OBSERVER.with_borrow_mut(Option::take).map(|o| o.subscriber),
56 )
57 }
58
59 fn set(observer: Option<AnySubscriber>) {
60 OBSERVER.with_borrow_mut(|o| {
61 *o = observer.map(|subscriber| ObserverState {
62 subscriber,
63 untracked: false,
64 })
65 });
66 }
67
68 fn replace(observer: Option<AnySubscriber>) -> SetObserverOnDrop {
69 SetObserverOnDrop(
70 OBSERVER
71 .with(|o| {
72 mem::replace(
73 &mut *o.borrow_mut(),
74 observer.map(|subscriber| ObserverState {
75 subscriber,
76 untracked: false,
77 }),
78 )
79 })
80 .map(|o| o.subscriber),
81 )
82 }
83
84 fn replace_untracked(observer: Option<AnySubscriber>) -> SetObserverOnDrop {
85 SetObserverOnDrop(
86 OBSERVER
87 .with(|o| {
88 mem::replace(
89 &mut *o.borrow_mut(),
90 observer.map(|subscriber| ObserverState {
91 subscriber,
92 untracked: true,
93 }),
94 )
95 })
96 .map(|o| o.subscriber),
97 )
98 }
99}
100
101#[track_caller]
130pub fn untrack<T>(fun: impl FnOnce() -> T) -> T {
131 #[cfg(debug_assertions)]
132 let _warning_guard = crate::diagnostics::SpecialNonReactiveZone::enter();
133
134 let _prev = Observer::take();
135 fun()
136}
137
138#[doc(hidden)]
139#[track_caller]
140pub fn untrack_with_diagnostics<T>(fun: impl FnOnce() -> T) -> T {
141 let _prev = Observer::take();
142 fun()
143}
144
145pub trait ToAnySubscriber {
147 fn to_any_subscriber(&self) -> AnySubscriber;
149}
150
151pub trait Subscriber: ReactiveNode {
153 fn add_source(&self, source: AnySource);
155
156 fn clear_sources(&self, subscriber: &AnySubscriber);
158}
159
160#[derive(Clone)]
162pub struct AnySubscriber(pub usize, pub Weak<dyn Subscriber + Send + Sync>);
163
164impl ToAnySubscriber for AnySubscriber {
165 fn to_any_subscriber(&self) -> AnySubscriber {
166 self.clone()
167 }
168}
169
170impl Subscriber for AnySubscriber {
171 fn add_source(&self, source: AnySource) {
172 if let Some(inner) = self.1.upgrade() {
173 inner.add_source(source);
174 }
175 }
176
177 fn clear_sources(&self, subscriber: &AnySubscriber) {
178 if let Some(inner) = self.1.upgrade() {
179 inner.clear_sources(subscriber);
180 }
181 }
182}
183
184impl ReactiveNode for AnySubscriber {
185 fn mark_dirty(&self) {
186 if let Some(inner) = self.1.upgrade() {
187 inner.mark_dirty()
188 }
189 }
190
191 fn mark_subscribers_check(&self) {
192 if let Some(inner) = self.1.upgrade() {
193 inner.mark_subscribers_check()
194 }
195 }
196
197 fn update_if_necessary(&self) -> bool {
198 if let Some(inner) = self.1.upgrade() {
199 inner.update_if_necessary()
200 } else {
201 false
202 }
203 }
204
205 fn mark_check(&self) {
206 if let Some(inner) = self.1.upgrade() {
207 inner.mark_check()
208 }
209 }
210}
211
212pub trait WithObserver {
214 fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T;
216
217 fn with_observer_untracked<T>(&self, fun: impl FnOnce() -> T) -> T;
220}
221
222impl WithObserver for AnySubscriber {
223 fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
224 let _prev = Observer::replace(Some(self.clone()));
225 fun()
226 }
227
228 fn with_observer_untracked<T>(&self, fun: impl FnOnce() -> T) -> T {
229 #[cfg(debug_assertions)]
230 let _guard = SpecialNonReactiveZone::enter();
231 let _prev = Observer::replace_untracked(Some(self.clone()));
232 fun()
233 }
234}
235
236impl WithObserver for Option<AnySubscriber> {
237 fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
238 let _prev = Observer::replace(self.clone());
239 fun()
240 }
241
242 fn with_observer_untracked<T>(&self, fun: impl FnOnce() -> T) -> T {
243 #[cfg(debug_assertions)]
244 let _guard = SpecialNonReactiveZone::enter();
245 let _prev = Observer::replace_untracked(self.clone());
246 fun()
247 }
248}
249
250impl Debug for AnySubscriber {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 f.debug_tuple("AnySubscriber").field(&self.0).finish()
253 }
254}
255
256impl Hash for AnySubscriber {
257 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
258 self.0.hash(state);
259 }
260}
261
262impl PartialEq for AnySubscriber {
263 fn eq(&self, other: &Self) -> bool {
264 self.0 == other.0
265 }
266}
267
268impl Eq for AnySubscriber {}