dioxus_core/
reactive_context.rs

1use crate::{current_scope_id, scope_context::Scope, tasks::SchedulerMsg, Runtime, ScopeId};
2use futures_channel::mpsc::UnboundedReceiver;
3use generational_box::{BorrowMutError, GenerationalBox, SyncStorage};
4use std::{
5    cell::RefCell,
6    collections::HashSet,
7    hash::Hash,
8    sync::{Arc, Mutex},
9};
10
11#[doc = include_str!("../docs/reactivity.md")]
12#[derive(Clone, Copy)]
13pub struct ReactiveContext {
14    scope: ScopeId,
15    inner: GenerationalBox<Inner, SyncStorage>,
16}
17
18impl PartialEq for ReactiveContext {
19    fn eq(&self, other: &Self) -> bool {
20        self.inner.ptr_eq(&other.inner)
21    }
22}
23
24impl Eq for ReactiveContext {}
25
26thread_local! {
27    static CURRENT: RefCell<Vec<ReactiveContext>> = const { RefCell::new(vec![]) };
28}
29
30impl std::fmt::Display for ReactiveContext {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        #[cfg(debug_assertions)]
33        {
34            if let Ok(read) = self.inner.try_read() {
35                if let Some(scope) = read.scope {
36                    return write!(f, "ReactiveContext(for scope: {:?})", scope);
37                }
38                return write!(f, "ReactiveContext created at {}", read.origin);
39            }
40        }
41        write!(f, "ReactiveContext")
42    }
43}
44
45impl ReactiveContext {
46    /// Create a new reactive context
47    #[track_caller]
48    pub fn new() -> (Self, UnboundedReceiver<()>) {
49        Self::new_with_origin(std::panic::Location::caller())
50    }
51
52    /// Create a new reactive context with a location for debugging purposes
53    /// This is useful for reactive contexts created within closures
54    pub fn new_with_origin(
55        origin: &'static std::panic::Location<'static>,
56    ) -> (Self, UnboundedReceiver<()>) {
57        let (tx, rx) = futures_channel::mpsc::unbounded();
58        let callback = move || {
59            // If there is already an update queued, we don't need to queue another
60            if !tx.is_empty() {
61                return;
62            }
63            let _ = tx.unbounded_send(());
64        };
65        let _self = Self::new_with_callback(
66            callback,
67            current_scope_id().unwrap_or_else(|e| panic!("{}", e)),
68            origin,
69        );
70        (_self, rx)
71    }
72
73    /// Create a new reactive context that may update a scope. When any signal that this context subscribes to changes, the callback will be run
74    pub fn new_with_callback(
75        callback: impl FnMut() + Send + Sync + 'static,
76        scope: ScopeId,
77        #[allow(unused)] origin: &'static std::panic::Location<'static>,
78    ) -> Self {
79        let inner = Inner {
80            self_: None,
81            update: Box::new(callback),
82            subscribers: Default::default(),
83            #[cfg(debug_assertions)]
84            origin,
85            #[cfg(debug_assertions)]
86            scope: None,
87        };
88
89        let owner = scope.owner();
90
91        let self_ = Self {
92            scope,
93            inner: owner.insert(inner),
94        };
95
96        self_.inner.write().self_ = Some(self_);
97
98        self_
99    }
100
101    /// Get the current reactive context from the nearest reactive hook or scope
102    pub fn current() -> Option<Self> {
103        CURRENT.with(|current| current.borrow().last().cloned())
104    }
105
106    /// Create a reactive context for a scope id
107    pub(crate) fn new_for_scope(scope: &Scope, runtime: &Runtime) -> Self {
108        let id = scope.id;
109        let sender = runtime.sender.clone();
110        let update_scope = move || {
111            tracing::trace!("Marking scope {:?} as dirty", id);
112            sender.unbounded_send(SchedulerMsg::Immediate(id)).unwrap();
113        };
114
115        // Otherwise, create a new context at the current scope
116        let inner = Inner {
117            self_: None,
118            update: Box::new(update_scope),
119            subscribers: Default::default(),
120            #[cfg(debug_assertions)]
121            origin: std::panic::Location::caller(),
122            #[cfg(debug_assertions)]
123            scope: Some(id),
124        };
125
126        let owner = scope.owner();
127
128        let self_ = Self {
129            scope: id,
130            inner: owner.insert(inner),
131        };
132
133        self_.inner.write().self_ = Some(self_);
134
135        self_
136    }
137
138    /// Clear all subscribers to this context
139    pub fn clear_subscribers(&self) {
140        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
141        #[allow(clippy::mutable_key_type)]
142        let old_subscribers = std::mem::take(&mut self.inner.write().subscribers);
143        for subscriber in old_subscribers {
144            subscriber.0.lock().unwrap().remove(self);
145        }
146    }
147
148    /// Update the subscribers
149    pub(crate) fn update_subscribers(&self) {
150        #[allow(clippy::mutable_key_type)]
151        let subscribers = &self.inner.read().subscribers;
152        for subscriber in subscribers.iter() {
153            subscriber.0.lock().unwrap().insert(*self);
154        }
155    }
156
157    /// Reset the reactive context and then run the callback in the context. This can be used to create custom reactive hooks like `use_memo`.
158    ///
159    /// ```rust, no_run
160    /// # use dioxus::prelude::*;
161    /// # use futures_util::StreamExt;
162    /// fn use_simplified_memo(mut closure: impl FnMut() -> i32 + 'static) -> Signal<i32> {
163    ///     use_hook(|| {
164    ///         // Create a new reactive context and channel that will receive a value every time a value the reactive context subscribes to changes
165    ///         let (reactive_context, mut changed) = ReactiveContext::new();
166    ///         // Compute the value of the memo inside the reactive context. This will subscribe the reactive context to any values you read inside the closure
167    ///         let value = reactive_context.reset_and_run_in(&mut closure);
168    ///         // Create a new signal with the value of the memo
169    ///         let mut signal = Signal::new(value);
170    ///         // Create a task that reruns the closure when the reactive context changes
171    ///         spawn(async move {
172    ///             while changed.next().await.is_some() {
173    ///                 // Since we reset the reactive context as we run the closure, our memo will only subscribe to the new values that are read in the closure
174    ///                 let new_value = reactive_context.run_in(&mut closure);
175    ///                 if new_value != value {
176    ///                     signal.set(new_value);
177    ///                 }
178    ///             }
179    ///         });
180    ///         signal
181    ///     })
182    /// }
183    ///
184    /// let mut boolean = use_signal(|| false);
185    /// let mut count = use_signal(|| 0);
186    /// // Because we use `reset_and_run_in` instead of just `run_in`, our memo will only subscribe to the signals that are read this run of the closure (initially just the boolean)
187    /// let memo = use_simplified_memo(move || if boolean() { count() } else { 0 });
188    /// println!("{memo}");
189    /// // Because the count signal is not read in this run of the closure, the memo will not rerun
190    /// count += 1;
191    /// println!("{memo}");
192    /// // Because the boolean signal is read in this run of the closure, the memo will rerun
193    /// boolean.toggle();
194    /// println!("{memo}");
195    /// // If we toggle the boolean again, and the memo unsubscribes from the count signal
196    /// boolean.toggle();
197    /// println!("{memo}");
198    /// ```
199    pub fn reset_and_run_in<O>(&self, f: impl FnOnce() -> O) -> O {
200        self.clear_subscribers();
201        self.run_in(f)
202    }
203
204    /// Run this function in the context of this reactive context
205    ///
206    /// This will set the current reactive context to this context for the duration of the function.
207    /// You can then get information about the current subscriptions.
208    pub fn run_in<O>(&self, f: impl FnOnce() -> O) -> O {
209        CURRENT.with(|current| current.borrow_mut().push(*self));
210        let out = f();
211        CURRENT.with(|current| current.borrow_mut().pop());
212        self.update_subscribers();
213        out
214    }
215
216    /// Marks this reactive context as dirty
217    ///
218    /// If there's a scope associated with this context, then it will be marked as dirty too
219    ///
220    /// Returns true if the context was marked as dirty, or false if the context has been dropped
221    pub fn mark_dirty(&self) -> bool {
222        if let Ok(mut self_write) = self.inner.try_write() {
223            #[cfg(debug_assertions)]
224            {
225                tracing::trace!(
226                    "Marking reactive context created at {} as dirty",
227                    self_write.origin
228                );
229            }
230
231            (self_write.update)();
232
233            true
234        } else {
235            false
236        }
237    }
238
239    /// Subscribe to this context. The reactive context will automatically remove itself from the subscriptions when it is reset.
240    pub fn subscribe(&self, subscriptions: Arc<Mutex<HashSet<ReactiveContext>>>) {
241        match self.inner.try_write() {
242            Ok(mut inner) => {
243                subscriptions.lock().unwrap().insert(*self);
244                inner.subscribers.insert(PointerHash(subscriptions));
245            }
246            // If the context was dropped, we don't need to subscribe to it anymore
247            Err(BorrowMutError::Dropped(_)) => {}
248            Err(expect) => {
249                panic!(
250                    "Expected to be able to write to reactive context to subscribe, but it failed with: {expect:?}"
251                );
252            }
253        }
254    }
255
256    /// Get the scope that inner CopyValue is associated with
257    pub fn origin_scope(&self) -> ScopeId {
258        self.scope
259    }
260}
261
262impl Hash for ReactiveContext {
263    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
264        self.inner.id().hash(state);
265    }
266}
267
268struct PointerHash<T>(Arc<T>);
269
270impl<T> Hash for PointerHash<T> {
271    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
272        std::sync::Arc::<T>::as_ptr(&self.0).hash(state);
273    }
274}
275
276impl<T> PartialEq for PointerHash<T> {
277    fn eq(&self, other: &Self) -> bool {
278        std::sync::Arc::<T>::as_ptr(&self.0) == std::sync::Arc::<T>::as_ptr(&other.0)
279    }
280}
281
282impl<T> Eq for PointerHash<T> {}
283
284impl<T> Clone for PointerHash<T> {
285    fn clone(&self) -> Self {
286        Self(self.0.clone())
287    }
288}
289
290type SubscriberMap = Mutex<HashSet<ReactiveContext>>;
291
292struct Inner {
293    self_: Option<ReactiveContext>,
294
295    // Futures will call .changed().await
296    update: Box<dyn FnMut() + Send + Sync>,
297
298    // Subscribers to this context
299    subscribers: HashSet<PointerHash<SubscriberMap>>,
300
301    // Debug information for signal subscriptions
302    #[cfg(debug_assertions)]
303    origin: &'static std::panic::Location<'static>,
304
305    #[cfg(debug_assertions)]
306    // The scope that this reactive context is associated with
307    scope: Option<ScopeId>,
308}
309
310impl Drop for Inner {
311    fn drop(&mut self) {
312        let Some(self_) = self.self_.take() else {
313            return;
314        };
315
316        for subscriber in std::mem::take(&mut self.subscribers) {
317            if let Ok(mut subscriber) = subscriber.0.lock() {
318                subscriber.remove(&self_);
319            }
320        }
321    }
322}