dioxus_core/
reactive_context.rs

1use crate::{
2    prelude::{current_scope_id, ScopeId},
3    scope_context::Scope,
4    tasks::SchedulerMsg,
5    Runtime,
6};
7use futures_channel::mpsc::UnboundedReceiver;
8use generational_box::{BorrowMutError, GenerationalBox, SyncStorage};
9use std::{
10    cell::RefCell,
11    collections::HashSet,
12    hash::Hash,
13    sync::{Arc, Mutex},
14};
15
16#[doc = include_str!("../docs/reactivity.md")]
17#[derive(Clone, Copy)]
18pub struct ReactiveContext {
19    scope: ScopeId,
20    inner: GenerationalBox<Inner, SyncStorage>,
21}
22
23impl PartialEq for ReactiveContext {
24    fn eq(&self, other: &Self) -> bool {
25        self.inner.ptr_eq(&other.inner)
26    }
27}
28
29impl Eq for ReactiveContext {}
30
31thread_local! {
32    static CURRENT: RefCell<Vec<ReactiveContext>> = const { RefCell::new(vec![]) };
33}
34
35impl std::fmt::Display for ReactiveContext {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        #[cfg(debug_assertions)]
38        {
39            if let Ok(read) = self.inner.try_read() {
40                if let Some(scope) = read.scope {
41                    return write!(f, "ReactiveContext(for scope: {:?})", scope);
42                }
43                return write!(f, "ReactiveContext created at {}", read.origin);
44            }
45        }
46        write!(f, "ReactiveContext")
47    }
48}
49
50impl ReactiveContext {
51    /// Create a new reactive context
52    #[track_caller]
53    pub fn new() -> (Self, UnboundedReceiver<()>) {
54        Self::new_with_origin(std::panic::Location::caller())
55    }
56
57    /// Create a new reactive context with a location for debugging purposes
58    /// This is useful for reactive contexts created within closures
59    pub fn new_with_origin(
60        origin: &'static std::panic::Location<'static>,
61    ) -> (Self, UnboundedReceiver<()>) {
62        let (tx, rx) = futures_channel::mpsc::unbounded();
63        let callback = move || {
64            // If there is already an update queued, we don't need to queue another
65            if !tx.is_empty() {
66                return;
67            }
68            let _ = tx.unbounded_send(());
69        };
70        let _self = Self::new_with_callback(
71            callback,
72            current_scope_id().unwrap_or_else(|e| panic!("{}", e)),
73            origin,
74        );
75        (_self, rx)
76    }
77
78    /// Create a new reactive context that may update a scope. When any signal that this context subscribes to changes, the callback will be run
79    pub fn new_with_callback(
80        callback: impl FnMut() + Send + Sync + 'static,
81        scope: ScopeId,
82        #[allow(unused)] origin: &'static std::panic::Location<'static>,
83    ) -> Self {
84        let inner = Inner {
85            self_: None,
86            update: Box::new(callback),
87            subscribers: Default::default(),
88            #[cfg(debug_assertions)]
89            origin,
90            #[cfg(debug_assertions)]
91            scope: None,
92        };
93
94        let owner = scope.owner();
95
96        let self_ = Self {
97            scope,
98            inner: owner.insert(inner),
99        };
100
101        self_.inner.write().self_ = Some(self_);
102
103        self_
104    }
105
106    /// Get the current reactive context from the nearest reactive hook or scope
107    pub fn current() -> Option<Self> {
108        CURRENT.with(|current| current.borrow().last().cloned())
109    }
110
111    /// Create a reactive context for a scope id
112    pub(crate) fn new_for_scope(scope: &Scope, runtime: &Runtime) -> Self {
113        let id = scope.id;
114        let sender = runtime.sender.clone();
115        let update_scope = move || {
116            tracing::trace!("Marking scope {:?} as dirty", id);
117            sender.unbounded_send(SchedulerMsg::Immediate(id)).unwrap();
118        };
119
120        // Otherwise, create a new context at the current scope
121        let inner = Inner {
122            self_: None,
123            update: Box::new(update_scope),
124            subscribers: Default::default(),
125            #[cfg(debug_assertions)]
126            origin: std::panic::Location::caller(),
127            #[cfg(debug_assertions)]
128            scope: Some(id),
129        };
130
131        let owner = scope.owner();
132
133        let self_ = Self {
134            scope: id,
135            inner: owner.insert(inner),
136        };
137
138        self_.inner.write().self_ = Some(self_);
139
140        self_
141    }
142
143    /// Clear all subscribers to this context
144    pub fn clear_subscribers(&self) {
145        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
146        #[allow(clippy::mutable_key_type)]
147        let old_subscribers = std::mem::take(&mut self.inner.write().subscribers);
148        for subscriber in old_subscribers {
149            subscriber.0.lock().unwrap().remove(self);
150        }
151    }
152
153    /// Update the subscribers
154    pub(crate) fn update_subscribers(&self) {
155        #[allow(clippy::mutable_key_type)]
156        let subscribers = &self.inner.read().subscribers;
157        for subscriber in subscribers.iter() {
158            subscriber.0.lock().unwrap().insert(*self);
159        }
160    }
161
162    /// Reset the reactive context and then run the callback in the context. This can be used to create custom reactive hooks like `use_memo`.
163    ///
164    /// ```rust, no_run
165    /// # use dioxus::prelude::*;
166    /// # use futures_util::StreamExt;
167    /// fn use_simplified_memo(mut closure: impl FnMut() -> i32 + 'static) -> Signal<i32> {
168    ///     use_hook(|| {
169    ///         // Create a new reactive context and channel that will receive a value every time a value the reactive context subscribes to changes
170    ///         let (reactive_context, mut changed) = ReactiveContext::new();
171    ///         // Compute the value of the memo inside the reactive context. This will subscribe the reactive context to any values you read inside the closure
172    ///         let value = reactive_context.reset_and_run_in(&mut closure);
173    ///         // Create a new signal with the value of the memo
174    ///         let mut signal = Signal::new(value);
175    ///         // Create a task that reruns the closure when the reactive context changes
176    ///         spawn(async move {
177    ///             while changed.next().await.is_some() {
178    ///                 // Since we reset the reactive context as we run the closure, our memo will only subscribe to the new values that are read in the closure
179    ///                 let new_value = reactive_context.run_in(&mut closure);
180    ///                 if new_value != value {
181    ///                     signal.set(new_value);
182    ///                 }
183    ///             }
184    ///         });
185    ///         signal
186    ///     })
187    /// }
188    ///
189    /// let mut boolean = use_signal(|| false);
190    /// let mut count = use_signal(|| 0);
191    /// // Because we use `reset_and_run_in` instead of just `run_in`, our memo will only subscribe to the signals that are read this run of the closure (initially just the boolean)
192    /// let memo = use_simplified_memo(move || if boolean() { count() } else { 0 });
193    /// println!("{memo}");
194    /// // Because the count signal is not read in this run of the closure, the memo will not rerun
195    /// count += 1;
196    /// println!("{memo}");
197    /// // Because the boolean signal is read in this run of the closure, the memo will rerun
198    /// boolean.toggle();
199    /// println!("{memo}");
200    /// // If we toggle the boolean again, and the memo unsubscribes from the count signal
201    /// boolean.toggle();
202    /// println!("{memo}");
203    /// ```
204    pub fn reset_and_run_in<O>(&self, f: impl FnOnce() -> O) -> O {
205        self.clear_subscribers();
206        self.run_in(f)
207    }
208
209    /// Run this function in the context of this reactive context
210    ///
211    /// This will set the current reactive context to this context for the duration of the function.
212    /// You can then get information about the current subscriptions.
213    pub fn run_in<O>(&self, f: impl FnOnce() -> O) -> O {
214        CURRENT.with(|current| current.borrow_mut().push(*self));
215        let out = f();
216        CURRENT.with(|current| current.borrow_mut().pop());
217        self.update_subscribers();
218        out
219    }
220
221    /// Marks this reactive context as dirty
222    ///
223    /// If there's a scope associated with this context, then it will be marked as dirty too
224    ///
225    /// Returns true if the context was marked as dirty, or false if the context has been dropped
226    pub fn mark_dirty(&self) -> bool {
227        if let Ok(mut self_write) = self.inner.try_write() {
228            #[cfg(debug_assertions)]
229            {
230                tracing::trace!(
231                    "Marking reactive context created at {} as dirty",
232                    self_write.origin
233                );
234            }
235
236            (self_write.update)();
237
238            true
239        } else {
240            false
241        }
242    }
243
244    /// Subscribe to this context. The reactive context will automatically remove itself from the subscriptions when it is reset.
245    pub fn subscribe(&self, subscriptions: Arc<Mutex<HashSet<ReactiveContext>>>) {
246        match self.inner.try_write() {
247            Ok(mut inner) => {
248                subscriptions.lock().unwrap().insert(*self);
249                inner.subscribers.insert(PointerHash(subscriptions));
250            }
251            // If the context was dropped, we don't need to subscribe to it anymore
252            Err(BorrowMutError::Dropped(_)) => {}
253            Err(expect) => {
254                panic!(
255                    "Expected to be able to write to reactive context to subscribe, but it failed with: {expect:?}"
256                );
257            }
258        }
259    }
260
261    /// Get the scope that inner CopyValue is associated with
262    pub fn origin_scope(&self) -> ScopeId {
263        self.scope
264    }
265}
266
267impl Hash for ReactiveContext {
268    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
269        self.inner.id().hash(state);
270    }
271}
272
273struct PointerHash<T>(Arc<T>);
274
275impl<T> Hash for PointerHash<T> {
276    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
277        std::sync::Arc::<T>::as_ptr(&self.0).hash(state);
278    }
279}
280
281impl<T> PartialEq for PointerHash<T> {
282    fn eq(&self, other: &Self) -> bool {
283        std::sync::Arc::<T>::as_ptr(&self.0) == std::sync::Arc::<T>::as_ptr(&other.0)
284    }
285}
286
287impl<T> Eq for PointerHash<T> {}
288
289impl<T> Clone for PointerHash<T> {
290    fn clone(&self) -> Self {
291        Self(self.0.clone())
292    }
293}
294
295type SubscriberMap = Mutex<HashSet<ReactiveContext>>;
296
297struct Inner {
298    self_: Option<ReactiveContext>,
299
300    // Futures will call .changed().await
301    update: Box<dyn FnMut() + Send + Sync>,
302
303    // Subscribers to this context
304    subscribers: HashSet<PointerHash<SubscriberMap>>,
305
306    // Debug information for signal subscriptions
307    #[cfg(debug_assertions)]
308    origin: &'static std::panic::Location<'static>,
309
310    #[cfg(debug_assertions)]
311    // The scope that this reactive context is associated with
312    scope: Option<ScopeId>,
313}
314
315impl Drop for Inner {
316    fn drop(&mut self) {
317        let Some(self_) = self.self_.take() else {
318            return;
319        };
320
321        for subscriber in std::mem::take(&mut self.subscribers) {
322            if let Ok(mut subscriber) = subscriber.0.lock() {
323                subscriber.remove(&self_);
324            }
325        }
326    }
327}