dioxus_core/
reactive_context.rs1use crate::{current_scope_id, scope_context::Scope, tasks::SchedulerMsg, Runtime, ScopeId};
2use futures_channel::mpsc::UnboundedReceiver;
3use generational_box::{BorrowMutError, GenerationalBox, SyncStorage};
4use std::{
5 cell::RefCell,
6 collections::HashSet,
7 hash::Hash,
8 sync::{Arc, Mutex},
9};
10
11#[doc = include_str!("../docs/reactivity.md")]
12#[derive(Clone, Copy)]
13pub struct ReactiveContext {
14 scope: ScopeId,
15 inner: GenerationalBox<Inner, SyncStorage>,
16}
17
18impl PartialEq for ReactiveContext {
19 fn eq(&self, other: &Self) -> bool {
20 self.inner.ptr_eq(&other.inner)
21 }
22}
23
24impl Eq for ReactiveContext {}
25
26thread_local! {
27 static CURRENT: RefCell<Vec<ReactiveContext>> = const { RefCell::new(vec![]) };
28}
29
30impl std::fmt::Display for ReactiveContext {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 #[cfg(debug_assertions)]
33 {
34 if let Ok(read) = self.inner.try_read() {
35 if let Some(scope) = read.scope {
36 return write!(f, "ReactiveContext(for scope: {:?})", scope);
37 }
38 return write!(f, "ReactiveContext created at {}", read.origin);
39 }
40 }
41 write!(f, "ReactiveContext")
42 }
43}
44
45impl ReactiveContext {
46 #[track_caller]
48 pub fn new() -> (Self, UnboundedReceiver<()>) {
49 Self::new_with_origin(std::panic::Location::caller())
50 }
51
52 pub fn new_with_origin(
55 origin: &'static std::panic::Location<'static>,
56 ) -> (Self, UnboundedReceiver<()>) {
57 let (tx, rx) = futures_channel::mpsc::unbounded();
58 let callback = move || {
59 if !tx.is_empty() {
61 return;
62 }
63 let _ = tx.unbounded_send(());
64 };
65 let _self = Self::new_with_callback(
66 callback,
67 current_scope_id().unwrap_or_else(|e| panic!("{}", e)),
68 origin,
69 );
70 (_self, rx)
71 }
72
73 pub fn new_with_callback(
75 callback: impl FnMut() + Send + Sync + 'static,
76 scope: ScopeId,
77 #[allow(unused)] origin: &'static std::panic::Location<'static>,
78 ) -> Self {
79 let inner = Inner {
80 self_: None,
81 update: Box::new(callback),
82 subscribers: Default::default(),
83 #[cfg(debug_assertions)]
84 origin,
85 #[cfg(debug_assertions)]
86 scope: None,
87 };
88
89 let owner = scope.owner();
90
91 let self_ = Self {
92 scope,
93 inner: owner.insert(inner),
94 };
95
96 self_.inner.write().self_ = Some(self_);
97
98 self_
99 }
100
101 pub fn current() -> Option<Self> {
103 CURRENT.with(|current| current.borrow().last().cloned())
104 }
105
106 pub(crate) fn new_for_scope(scope: &Scope, runtime: &Runtime) -> Self {
108 let id = scope.id;
109 let sender = runtime.sender.clone();
110 let update_scope = move || {
111 tracing::trace!("Marking scope {:?} as dirty", id);
112 sender.unbounded_send(SchedulerMsg::Immediate(id)).unwrap();
113 };
114
115 let inner = Inner {
117 self_: None,
118 update: Box::new(update_scope),
119 subscribers: Default::default(),
120 #[cfg(debug_assertions)]
121 origin: std::panic::Location::caller(),
122 #[cfg(debug_assertions)]
123 scope: Some(id),
124 };
125
126 let owner = scope.owner();
127
128 let self_ = Self {
129 scope: id,
130 inner: owner.insert(inner),
131 };
132
133 self_.inner.write().self_ = Some(self_);
134
135 self_
136 }
137
138 pub fn clear_subscribers(&self) {
140 #[allow(clippy::mutable_key_type)]
142 let old_subscribers = std::mem::take(&mut self.inner.write().subscribers);
143 for subscriber in old_subscribers {
144 subscriber.0.remove(self);
145 }
146 }
147
148 pub(crate) fn update_subscribers(&self) {
150 #[allow(clippy::mutable_key_type)]
151 let subscribers = &self.inner.read().subscribers;
152 for subscriber in subscribers.iter() {
153 subscriber.0.add(*self);
154 }
155 }
156
157 pub fn reset_and_run_in<O>(&self, f: impl FnOnce() -> O) -> O {
201 self.clear_subscribers();
202 self.run_in(f)
203 }
204
205 pub fn run_in<O>(&self, f: impl FnOnce() -> O) -> O {
210 CURRENT.with(|current| current.borrow_mut().push(*self));
211 let out = f();
212 CURRENT.with(|current| current.borrow_mut().pop());
213 self.update_subscribers();
214 out
215 }
216
217 pub fn mark_dirty(&self) -> bool {
223 if let Ok(mut self_write) = self.inner.try_write() {
224 #[cfg(debug_assertions)]
225 {
226 tracing::trace!(
227 "Marking reactive context created at {} as dirty",
228 self_write.origin
229 );
230 }
231
232 (self_write.update)();
233
234 true
235 } else {
236 false
237 }
238 }
239
240 pub fn subscribe(&self, subscriptions: impl Into<Subscribers>) {
242 match self.inner.try_write() {
243 Ok(mut inner) => {
244 let subscriptions = subscriptions.into();
245 subscriptions.add(*self);
246 inner
247 .subscribers
248 .insert(PointerHash(subscriptions.inner.clone()));
249 }
250 Err(BorrowMutError::Dropped(_)) => {}
252 Err(expect) => {
253 panic!(
254 "Expected to be able to write to reactive context to subscribe, but it failed with: {expect:?}"
255 );
256 }
257 }
258 }
259
260 pub fn origin_scope(&self) -> ScopeId {
262 self.scope
263 }
264}
265
266impl Hash for ReactiveContext {
267 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
268 self.inner.id().hash(state);
269 }
270}
271
272struct PointerHash<T: ?Sized>(Arc<T>);
273
274impl<T: ?Sized> Hash for PointerHash<T> {
275 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
276 std::sync::Arc::<T>::as_ptr(&self.0).hash(state);
277 }
278}
279
280impl<T: ?Sized> PartialEq for PointerHash<T> {
281 fn eq(&self, other: &Self) -> bool {
282 std::sync::Arc::ptr_eq(&self.0, &other.0)
283 }
284}
285
286impl<T: ?Sized> Eq for PointerHash<T> {}
287
288impl<T: ?Sized> Clone for PointerHash<T> {
289 fn clone(&self) -> Self {
290 Self(self.0.clone())
291 }
292}
293
294struct Inner {
295 self_: Option<ReactiveContext>,
296
297 update: Box<dyn FnMut() + Send + Sync>,
299
300 subscribers: HashSet<PointerHash<dyn SubscriberList + Send + Sync>>,
302
303 #[cfg(debug_assertions)]
305 origin: &'static std::panic::Location<'static>,
306
307 #[cfg(debug_assertions)]
308 scope: Option<ScopeId>,
310}
311
312impl Drop for Inner {
313 fn drop(&mut self) {
314 let Some(self_) = self.self_.take() else {
315 return;
316 };
317
318 for subscriber in std::mem::take(&mut self.subscribers) {
319 subscriber.0.remove(&self_);
320 }
321 }
322}
323
324#[derive(Clone)]
326pub struct Subscribers {
327 pub(crate) inner: Arc<dyn SubscriberList + Send + Sync>,
329}
330
331impl Default for Subscribers {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337impl Subscribers {
338 pub fn new_noop() -> Self {
340 struct NoopSubscribers;
341 impl SubscriberList for NoopSubscribers {
342 fn add(&self, _subscriber: ReactiveContext) {}
343
344 fn remove(&self, _subscriber: &ReactiveContext) {}
345
346 fn visit(&self, _f: &mut dyn FnMut(&ReactiveContext)) {}
347 }
348 Subscribers {
349 inner: Arc::new(NoopSubscribers),
350 }
351 }
352
353 pub fn new() -> Self {
355 Subscribers {
356 inner: Arc::new(Mutex::new(HashSet::new())),
357 }
358 }
359
360 pub fn add(&self, subscriber: ReactiveContext) {
362 self.inner.add(subscriber);
363 }
364
365 pub fn remove(&self, subscriber: &ReactiveContext) {
367 self.inner.remove(subscriber);
368 }
369
370 pub fn visit(&self, mut f: impl FnMut(&ReactiveContext)) {
372 self.inner.visit(&mut f);
373 }
374}
375
376impl<S: SubscriberList + Send + Sync + 'static> From<Arc<S>> for Subscribers {
377 fn from(inner: Arc<S>) -> Self {
378 Subscribers { inner }
379 }
380}
381
382pub trait SubscriberList: Send + Sync {
384 fn add(&self, subscriber: ReactiveContext);
386
387 fn remove(&self, subscriber: &ReactiveContext);
389
390 fn visit(&self, f: &mut dyn FnMut(&ReactiveContext));
392}
393
394impl SubscriberList for Mutex<HashSet<ReactiveContext>> {
395 fn add(&self, subscriber: ReactiveContext) {
396 if let Ok(mut lock) = self.lock() {
397 lock.insert(subscriber);
398 } else {
399 tracing::warn!("Failed to lock subscriber list to add subscriber: {subscriber}");
400 }
401 }
402
403 fn remove(&self, subscriber: &ReactiveContext) {
404 if let Ok(mut lock) = self.lock() {
405 lock.remove(subscriber);
406 } else {
407 tracing::warn!("Failed to lock subscriber list to remove subscriber: {subscriber}");
408 }
409 }
410
411 fn visit(&self, f: &mut dyn FnMut(&ReactiveContext)) {
412 if let Ok(lock) = self.lock() {
413 lock.iter().for_each(f);
414 } else {
415 tracing::warn!("Failed to lock subscriber list to visit subscribers");
416 }
417 }
418}