dioxus_core/
reactive_context.rs

1use crate::{current_scope_id, scope_context::Scope, tasks::SchedulerMsg, Runtime, ScopeId};
2use futures_channel::mpsc::UnboundedReceiver;
3use generational_box::{BorrowMutError, GenerationalBox, SyncStorage};
4use std::{
5    cell::RefCell,
6    collections::HashSet,
7    hash::Hash,
8    sync::{Arc, Mutex},
9};
10
11#[doc = include_str!("../docs/reactivity.md")]
12#[derive(Clone, Copy)]
13pub struct ReactiveContext {
14    scope: ScopeId,
15    inner: GenerationalBox<Inner, SyncStorage>,
16}
17
18impl PartialEq for ReactiveContext {
19    fn eq(&self, other: &Self) -> bool {
20        self.inner.ptr_eq(&other.inner)
21    }
22}
23
24impl Eq for ReactiveContext {}
25
26thread_local! {
27    static CURRENT: RefCell<Vec<ReactiveContext>> = const { RefCell::new(vec![]) };
28}
29
30impl std::fmt::Display for ReactiveContext {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        #[cfg(debug_assertions)]
33        {
34            if let Ok(read) = self.inner.try_read() {
35                if let Some(scope) = read.scope {
36                    return write!(f, "ReactiveContext(for scope: {:?})", scope);
37                }
38                return write!(f, "ReactiveContext created at {}", read.origin);
39            }
40        }
41        write!(f, "ReactiveContext")
42    }
43}
44
45impl ReactiveContext {
46    /// Create a new reactive context
47    #[track_caller]
48    pub fn new() -> (Self, UnboundedReceiver<()>) {
49        Self::new_with_origin(std::panic::Location::caller())
50    }
51
52    /// Create a new reactive context with a location for debugging purposes
53    /// This is useful for reactive contexts created within closures
54    pub fn new_with_origin(
55        origin: &'static std::panic::Location<'static>,
56    ) -> (Self, UnboundedReceiver<()>) {
57        let (tx, rx) = futures_channel::mpsc::unbounded();
58        let callback = move || {
59            // If there is already an update queued, we don't need to queue another
60            if !tx.is_empty() {
61                return;
62            }
63            let _ = tx.unbounded_send(());
64        };
65        let _self = Self::new_with_callback(callback, current_scope_id(), origin);
66        (_self, rx)
67    }
68
69    /// Create a new reactive context that may update a scope. When any signal that this context subscribes to changes, the callback will be run
70    pub fn new_with_callback(
71        callback: impl FnMut() + Send + Sync + 'static,
72        scope: ScopeId,
73        #[allow(unused)] origin: &'static std::panic::Location<'static>,
74    ) -> Self {
75        let inner = Inner {
76            self_: None,
77            update: Box::new(callback),
78            subscribers: Default::default(),
79            #[cfg(debug_assertions)]
80            origin,
81            #[cfg(debug_assertions)]
82            scope: None,
83        };
84
85        let owner = Runtime::current().scope_owner(scope);
86
87        let self_ = Self {
88            scope,
89            inner: owner.insert(inner),
90        };
91
92        self_.inner.write().self_ = Some(self_);
93
94        self_
95    }
96
97    /// Get the current reactive context from the nearest reactive hook or scope
98    pub fn current() -> Option<Self> {
99        CURRENT.with(|current| current.borrow().last().cloned())
100    }
101
102    /// Create a reactive context for a scope id
103    pub(crate) fn new_for_scope(scope: &Scope, runtime: &Runtime) -> Self {
104        let id = scope.id;
105        let sender = runtime.sender.clone();
106        let update_scope = move || {
107            _ = sender.unbounded_send(SchedulerMsg::Immediate(id));
108        };
109
110        // Otherwise, create a new context at the current scope
111        let inner = Inner {
112            self_: None,
113            update: Box::new(update_scope),
114            subscribers: Default::default(),
115            #[cfg(debug_assertions)]
116            origin: std::panic::Location::caller(),
117            #[cfg(debug_assertions)]
118            scope: Some(id),
119        };
120
121        let owner = scope.owner();
122
123        let self_ = Self {
124            scope: id,
125            inner: owner.insert(inner),
126        };
127
128        self_.inner.write().self_ = Some(self_);
129
130        self_
131    }
132
133    /// Clear all subscribers to this context
134    pub fn clear_subscribers(&self) {
135        // The key type is mutable, but the hash is stable through mutations because we hash by pointer
136        #[allow(clippy::mutable_key_type)]
137        let old_subscribers = std::mem::take(&mut self.inner.write().subscribers);
138        for subscriber in old_subscribers {
139            subscriber.0.remove(self);
140        }
141    }
142
143    /// Update the subscribers
144    pub(crate) fn update_subscribers(&self) {
145        #[allow(clippy::mutable_key_type)]
146        let subscribers = &self.inner.read().subscribers;
147        for subscriber in subscribers.iter() {
148            subscriber.0.add(*self);
149        }
150    }
151
152    /// Reset the reactive context and then run the callback in the context. This can be used to create custom reactive hooks like `use_memo`.
153    ///
154    /// ```rust, no_run
155    /// # use dioxus::prelude::*;
156    /// # use dioxus_core::ReactiveContext;
157    /// # use futures_util::StreamExt;
158    /// fn use_simplified_memo(mut closure: impl FnMut() -> i32 + 'static) -> Signal<i32> {
159    ///     use_hook(|| {
160    ///         // Create a new reactive context and channel that will receive a value every time a value the reactive context subscribes to changes
161    ///         let (reactive_context, mut changed) = ReactiveContext::new();
162    ///         // Compute the value of the memo inside the reactive context. This will subscribe the reactive context to any values you read inside the closure
163    ///         let value = reactive_context.reset_and_run_in(&mut closure);
164    ///         // Create a new signal with the value of the memo
165    ///         let mut signal = Signal::new(value);
166    ///         // Create a task that reruns the closure when the reactive context changes
167    ///         spawn(async move {
168    ///             while changed.next().await.is_some() {
169    ///                 // Since we reset the reactive context as we run the closure, our memo will only subscribe to the new values that are read in the closure
170    ///                 let new_value = reactive_context.run_in(&mut closure);
171    ///                 if new_value != value {
172    ///                     signal.set(new_value);
173    ///                 }
174    ///             }
175    ///         });
176    ///         signal
177    ///     })
178    /// }
179    ///
180    /// let mut boolean = use_signal(|| false);
181    /// let mut count = use_signal(|| 0);
182    /// // Because we use `reset_and_run_in` instead of just `run_in`, our memo will only subscribe to the signals that are read this run of the closure (initially just the boolean)
183    /// let memo = use_simplified_memo(move || if boolean() { count() } else { 0 });
184    /// println!("{memo}");
185    /// // Because the count signal is not read in this run of the closure, the memo will not rerun
186    /// count += 1;
187    /// println!("{memo}");
188    /// // Because the boolean signal is read in this run of the closure, the memo will rerun
189    /// boolean.toggle();
190    /// println!("{memo}");
191    /// // If we toggle the boolean again, and the memo unsubscribes from the count signal
192    /// boolean.toggle();
193    /// println!("{memo}");
194    /// ```
195    pub fn reset_and_run_in<O>(&self, f: impl FnOnce() -> O) -> O {
196        self.clear_subscribers();
197        self.run_in(f)
198    }
199
200    /// Run this function in the context of this reactive context
201    ///
202    /// This will set the current reactive context to this context for the duration of the function.
203    /// You can then get information about the current subscriptions.
204    pub fn run_in<O>(&self, f: impl FnOnce() -> O) -> O {
205        CURRENT.with(|current| current.borrow_mut().push(*self));
206        let out = f();
207        CURRENT.with(|current| current.borrow_mut().pop());
208        self.update_subscribers();
209        out
210    }
211
212    /// Marks this reactive context as dirty
213    ///
214    /// If there's a scope associated with this context, then it will be marked as dirty too
215    ///
216    /// Returns true if the context was marked as dirty, or false if the context has been dropped
217    pub fn mark_dirty(&self) -> bool {
218        if let Ok(mut self_write) = self.inner.try_write() {
219            #[cfg(debug_assertions)]
220            {
221                tracing::trace!(
222                    "Marking reactive context created at {} as dirty",
223                    self_write.origin
224                );
225            }
226
227            (self_write.update)();
228
229            true
230        } else {
231            false
232        }
233    }
234
235    /// Subscribe to this context. The reactive context will automatically remove itself from the subscriptions when it is reset.
236    pub fn subscribe(&self, subscriptions: impl Into<Subscribers>) {
237        match self.inner.try_write() {
238            Ok(mut inner) => {
239                let subscriptions = subscriptions.into();
240                subscriptions.add(*self);
241                inner
242                    .subscribers
243                    .insert(PointerHash(subscriptions.inner.clone()));
244            }
245            // If the context was dropped, we don't need to subscribe to it anymore
246            Err(BorrowMutError::Dropped(_)) => {}
247            Err(expect) => {
248                panic!(
249                    "Expected to be able to write to reactive context to subscribe, but it failed with: {expect:?}"
250                );
251            }
252        }
253    }
254
255    /// Get the scope that inner CopyValue is associated with
256    pub fn origin_scope(&self) -> ScopeId {
257        self.scope
258    }
259}
260
261impl Hash for ReactiveContext {
262    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
263        self.inner.id().hash(state);
264    }
265}
266
267struct PointerHash<T: ?Sized>(Arc<T>);
268
269impl<T: ?Sized> Hash for PointerHash<T> {
270    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
271        std::sync::Arc::<T>::as_ptr(&self.0).hash(state);
272    }
273}
274
275impl<T: ?Sized> PartialEq for PointerHash<T> {
276    fn eq(&self, other: &Self) -> bool {
277        std::sync::Arc::ptr_eq(&self.0, &other.0)
278    }
279}
280
281impl<T: ?Sized> Eq for PointerHash<T> {}
282
283impl<T: ?Sized> Clone for PointerHash<T> {
284    fn clone(&self) -> Self {
285        Self(self.0.clone())
286    }
287}
288
289struct Inner {
290    self_: Option<ReactiveContext>,
291
292    // Futures will call .changed().await
293    update: Box<dyn FnMut() + Send + Sync>,
294
295    // Subscribers to this context
296    subscribers: HashSet<PointerHash<dyn SubscriberList + Send + Sync>>,
297
298    // Debug information for signal subscriptions
299    #[cfg(debug_assertions)]
300    origin: &'static std::panic::Location<'static>,
301
302    #[cfg(debug_assertions)]
303    // The scope that this reactive context is associated with
304    scope: Option<ScopeId>,
305}
306
307impl Drop for Inner {
308    fn drop(&mut self) {
309        let Some(self_) = self.self_.take() else {
310            return;
311        };
312
313        for subscriber in std::mem::take(&mut self.subscribers) {
314            subscriber.0.remove(&self_);
315        }
316    }
317}
318
319/// A list of [ReactiveContext]s that are subscribed. This is used to notify subscribers when the value changes.
320#[derive(Clone)]
321pub struct Subscribers {
322    /// The list of subscribers.
323    pub(crate) inner: Arc<dyn SubscriberList + Send + Sync>,
324}
325
326impl Default for Subscribers {
327    fn default() -> Self {
328        Self::new()
329    }
330}
331
332impl Subscribers {
333    /// Create a new no-op list of subscribers.
334    pub fn new_noop() -> Self {
335        struct NoopSubscribers;
336        impl SubscriberList for NoopSubscribers {
337            fn add(&self, _subscriber: ReactiveContext) {}
338
339            fn remove(&self, _subscriber: &ReactiveContext) {}
340
341            fn visit(&self, _f: &mut dyn FnMut(&ReactiveContext)) {}
342        }
343        Subscribers {
344            inner: Arc::new(NoopSubscribers),
345        }
346    }
347
348    /// Create a new list of subscribers.
349    pub fn new() -> Self {
350        Subscribers {
351            inner: Arc::new(Mutex::new(HashSet::new())),
352        }
353    }
354
355    /// Add a subscriber to the list.
356    pub fn add(&self, subscriber: ReactiveContext) {
357        self.inner.add(subscriber);
358    }
359
360    /// Remove a subscriber from the list.
361    pub fn remove(&self, subscriber: &ReactiveContext) {
362        self.inner.remove(subscriber);
363    }
364
365    /// Visit all subscribers in the list.
366    pub fn visit(&self, mut f: impl FnMut(&ReactiveContext)) {
367        self.inner.visit(&mut f);
368    }
369}
370
371impl<S: SubscriberList + Send + Sync + 'static> From<Arc<S>> for Subscribers {
372    fn from(inner: Arc<S>) -> Self {
373        Subscribers { inner }
374    }
375}
376
377/// A list of subscribers that can be notified when the value changes. This is used to track when the value changes and notify subscribers.
378pub trait SubscriberList: Send + Sync {
379    /// Add a subscriber to the list.
380    fn add(&self, subscriber: ReactiveContext);
381
382    /// Remove a subscriber from the list.
383    fn remove(&self, subscriber: &ReactiveContext);
384
385    /// Visit all subscribers in the list.
386    fn visit(&self, f: &mut dyn FnMut(&ReactiveContext));
387}
388
389impl SubscriberList for Mutex<HashSet<ReactiveContext>> {
390    fn add(&self, subscriber: ReactiveContext) {
391        if let Ok(mut lock) = self.lock() {
392            lock.insert(subscriber);
393        } else {
394            tracing::warn!("Failed to lock subscriber list to add subscriber: {subscriber}");
395        }
396    }
397
398    fn remove(&self, subscriber: &ReactiveContext) {
399        if let Ok(mut lock) = self.lock() {
400            lock.remove(subscriber);
401        } else {
402            tracing::warn!("Failed to lock subscriber list to remove subscriber: {subscriber}");
403        }
404    }
405
406    fn visit(&self, f: &mut dyn FnMut(&ReactiveContext)) {
407        if let Ok(lock) = self.lock() {
408            lock.iter().for_each(f);
409        } else {
410            tracing::warn!("Failed to lock subscriber list to visit subscribers");
411        }
412    }
413}