reactive_graph/graph/
subscriber.rs

1use super::{node::ReactiveNode, AnySource};
2#[cfg(debug_assertions)]
3use crate::diagnostics::SpecialNonReactiveZone;
4use core::{fmt::Debug, hash::Hash};
5use std::{cell::RefCell, mem, sync::Weak};
6
7thread_local! {
8    static OBSERVER: RefCell<Option<ObserverState>> = const { RefCell::new(None) };
9}
10
11#[derive(Debug)]
12struct ObserverState {
13    subscriber: AnySubscriber,
14    untracked: bool,
15}
16
17/// The current reactive observer.
18///
19/// The observer is whatever reactive node is currently listening for signals that need to be
20/// tracked. For example, if an effect is running, that effect is the observer, which means it will
21/// subscribe to changes in any signals that are read.
22pub struct Observer;
23
24#[derive(Debug)]
25struct SetObserverOnDrop(Option<AnySubscriber>);
26
27impl Drop for SetObserverOnDrop {
28    fn drop(&mut self) {
29        Observer::set(self.0.take());
30    }
31}
32
33impl Observer {
34    /// Returns the current observer, if any.
35    pub fn get() -> Option<AnySubscriber> {
36        OBSERVER.with_borrow(|obs| {
37            obs.as_ref().and_then(|obs| {
38                if obs.untracked {
39                    None
40                } else {
41                    Some(obs.subscriber.clone())
42                }
43            })
44        })
45    }
46
47    pub(crate) fn is(observer: &AnySubscriber) -> bool {
48        OBSERVER.with_borrow(|o| {
49            o.as_ref().map(|o| &o.subscriber) == Some(observer)
50        })
51    }
52
53    fn take() -> SetObserverOnDrop {
54        SetObserverOnDrop(
55            OBSERVER.with_borrow_mut(Option::take).map(|o| o.subscriber),
56        )
57    }
58
59    fn set(observer: Option<AnySubscriber>) {
60        OBSERVER.with_borrow_mut(|o| {
61            *o = observer.map(|subscriber| ObserverState {
62                subscriber,
63                untracked: false,
64            })
65        });
66    }
67
68    fn replace(observer: Option<AnySubscriber>) -> SetObserverOnDrop {
69        SetObserverOnDrop(
70            OBSERVER
71                .with(|o| {
72                    mem::replace(
73                        &mut *o.borrow_mut(),
74                        observer.map(|subscriber| ObserverState {
75                            subscriber,
76                            untracked: false,
77                        }),
78                    )
79                })
80                .map(|o| o.subscriber),
81        )
82    }
83
84    fn replace_untracked(observer: Option<AnySubscriber>) -> SetObserverOnDrop {
85        SetObserverOnDrop(
86            OBSERVER
87                .with(|o| {
88                    mem::replace(
89                        &mut *o.borrow_mut(),
90                        observer.map(|subscriber| ObserverState {
91                            subscriber,
92                            untracked: true,
93                        }),
94                    )
95                })
96                .map(|o| o.subscriber),
97        )
98    }
99}
100
101/// Suspends reactive tracking while running the given function.
102///
103/// This can be used to isolate parts of the reactive graph from one another.
104///
105/// ```rust
106/// # use reactive_graph::computed::*;
107/// # use reactive_graph::signal::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
108/// # use reactive_graph::prelude::*;
109/// # use reactive_graph::graph::untrack;
110/// # tokio_test::block_on(async move {
111/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
112/// let (a, set_a) = signal(0);
113/// let (b, set_b) = signal(0);
114/// let c = Memo::new(move |_| {
115///     // this memo will *only* update when `a` changes
116///     a.get() + untrack(move || b.get())
117/// });
118///
119/// assert_eq!(c.get(), 0);
120/// set_a.set(1);
121/// assert_eq!(c.get(), 1);
122/// set_b.set(1);
123/// // hasn't updated, because we untracked before reading b
124/// assert_eq!(c.get(), 1);
125/// set_a.set(2);
126/// assert_eq!(c.get(), 3);
127/// # });
128/// ```
129#[track_caller]
130pub fn untrack<T>(fun: impl FnOnce() -> T) -> T {
131    #[cfg(debug_assertions)]
132    let _warning_guard = crate::diagnostics::SpecialNonReactiveZone::enter();
133
134    let _prev = Observer::take();
135    fun()
136}
137
138#[doc(hidden)]
139#[track_caller]
140pub fn untrack_with_diagnostics<T>(fun: impl FnOnce() -> T) -> T {
141    let _prev = Observer::take();
142    fun()
143}
144
145/// Converts a [`Subscriber`] to a type-erased [`AnySubscriber`].
146pub trait ToAnySubscriber {
147    /// Converts this type to its type-erased equivalent.
148    fn to_any_subscriber(&self) -> AnySubscriber;
149}
150
151/// Any type that can track reactive values (like an effect or a memo).
152pub trait Subscriber: ReactiveNode {
153    /// Adds a subscriber to this subscriber's list of dependencies.
154    fn add_source(&self, source: AnySource);
155
156    /// Clears the set of sources for this subscriber.
157    fn clear_sources(&self, subscriber: &AnySubscriber);
158}
159
160/// A type-erased subscriber.
161#[derive(Clone)]
162pub struct AnySubscriber(pub usize, pub Weak<dyn Subscriber + Send + Sync>);
163
164impl ToAnySubscriber for AnySubscriber {
165    fn to_any_subscriber(&self) -> AnySubscriber {
166        self.clone()
167    }
168}
169
170impl Subscriber for AnySubscriber {
171    fn add_source(&self, source: AnySource) {
172        if let Some(inner) = self.1.upgrade() {
173            inner.add_source(source);
174        }
175    }
176
177    fn clear_sources(&self, subscriber: &AnySubscriber) {
178        if let Some(inner) = self.1.upgrade() {
179            inner.clear_sources(subscriber);
180        }
181    }
182}
183
184impl ReactiveNode for AnySubscriber {
185    fn mark_dirty(&self) {
186        if let Some(inner) = self.1.upgrade() {
187            inner.mark_dirty()
188        }
189    }
190
191    fn mark_subscribers_check(&self) {
192        if let Some(inner) = self.1.upgrade() {
193            inner.mark_subscribers_check()
194        }
195    }
196
197    fn update_if_necessary(&self) -> bool {
198        if let Some(inner) = self.1.upgrade() {
199            inner.update_if_necessary()
200        } else {
201            false
202        }
203    }
204
205    fn mark_check(&self) {
206        if let Some(inner) = self.1.upgrade() {
207            inner.mark_check()
208        }
209    }
210}
211
212/// Runs code with some subscriber as the thread-local [`Observer`].
213pub trait WithObserver {
214    /// Runs the given function with this subscriber as the thread-local [`Observer`].
215    fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T;
216
217    /// Runs the given function with this subscriber as the thread-local [`Observer`],
218    /// but without tracking dependencies.
219    fn with_observer_untracked<T>(&self, fun: impl FnOnce() -> T) -> T;
220}
221
222impl WithObserver for AnySubscriber {
223    fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
224        let _prev = Observer::replace(Some(self.clone()));
225        fun()
226    }
227
228    fn with_observer_untracked<T>(&self, fun: impl FnOnce() -> T) -> T {
229        #[cfg(debug_assertions)]
230        let _guard = SpecialNonReactiveZone::enter();
231        let _prev = Observer::replace_untracked(Some(self.clone()));
232        fun()
233    }
234}
235
236impl WithObserver for Option<AnySubscriber> {
237    fn with_observer<T>(&self, fun: impl FnOnce() -> T) -> T {
238        let _prev = Observer::replace(self.clone());
239        fun()
240    }
241
242    fn with_observer_untracked<T>(&self, fun: impl FnOnce() -> T) -> T {
243        #[cfg(debug_assertions)]
244        let _guard = SpecialNonReactiveZone::enter();
245        let _prev = Observer::replace_untracked(self.clone());
246        fun()
247    }
248}
249
250impl Debug for AnySubscriber {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        f.debug_tuple("AnySubscriber").field(&self.0).finish()
253    }
254}
255
256impl Hash for AnySubscriber {
257    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
258        self.0.hash(state);
259    }
260}
261
262impl PartialEq for AnySubscriber {
263    fn eq(&self, other: &Self) -> bool {
264        self.0 == other.0
265    }
266}
267
268impl Eq for AnySubscriber {}